diff --git a/folkmq/src/main/java/org/noear/folkmq/client/MqClientListener.java b/folkmq/src/main/java/org/noear/folkmq/client/MqClientListener.java index bb5d782a..6dc583ae 100644 --- a/folkmq/src/main/java/org/noear/folkmq/client/MqClientListener.java +++ b/folkmq/src/main/java/org/noear/folkmq/client/MqClientListener.java @@ -68,15 +68,21 @@ private void onReceive(Session s, Message m, MqMessageReceivedImpl message, bool if (message.isTransaction()) { if (client.transactionCheckback != null) { client.transactionCheckback.check(message); + } else { + s.sendAlarm(m, "Client no checkback handler!"); } } else { if (client.listenHandler != null) { client.listenHandler.consume(message); + } else { + s.sendAlarm(m, "Client no request handler!"); } } } catch (Throwable e) { try { - s.sendAlarm(m, "Request handle error:" + e.getMessage()); + if (s.isValid()) { + s.sendAlarm(m, "Client request handle error:" + e.getMessage()); + } log.warn("Client request handle error, tid={}", message.getTid(), e); } catch (Throwable err) { log.warn("Client request handle error, tid={}", message.getTid(), e); diff --git a/folkmq/src/main/java/org/noear/folkmq/server/MqQueueDefault.java b/folkmq/src/main/java/org/noear/folkmq/server/MqQueueDefault.java index 42573a4a..90eca452 100644 --- a/folkmq/src/main/java/org/noear/folkmq/server/MqQueueDefault.java +++ b/folkmq/src/main/java/org/noear/folkmq/server/MqQueueDefault.java @@ -396,6 +396,8 @@ private void distributeDo(Session s1, MqMessageHolder messageHolder) throws IOEx s1.sendAndRequest(MqConstants.MQ_EVENT_DISTRIBUTE, messageHolder.getContent(), -1).thenReply(r -> { int ack = Integer.parseInt(r.metaOrDefault(MqConstants.MQ_META_ACK, "0")); acknowledgeDo(messageHolder, ack, true); + }).thenError(err->{ + acknowledgeDo(messageHolder, 0, true); }); //2.添加保险延时任务:如果没有回执就重发