This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 65e3d1e4c0d7dab8fd3a6eb5bd316c2da3b43ff0 Author: nowinkey <[email protected]> AuthorDate: Wed Feb 8 21:08:58 2023 +0800 Add try catch --- .../apache/rocketmq/store/DefaultMessageStore.java | 28 ++++++++++++---------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 48146c638..460cb9f1e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -2855,20 +2855,24 @@ public class DefaultMessageStore implements MessageStore { if (!batchDispatchRequestQueue.isEmpty()) { BatchDispatchRequest task = batchDispatchRequestQueue.peek(); batchDispatchRequestExecutor.execute(() -> { - ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate(); - tmpByteBuffer.position(task.position); - tmpByteBuffer.limit(task.position + task.size); - List<DispatchRequest> dispatchRequestList = new ArrayList<>(); - while (tmpByteBuffer.hasRemaining()) { - DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false); - if (dispatchRequest.isSuccess()) { - dispatchRequestList.add(dispatchRequest); - } else { - LOGGER.error("[BUG]read total count not equals msg total size."); + try { + ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate(); + tmpByteBuffer.position(task.position); + tmpByteBuffer.limit(task.position + task.size); + List<DispatchRequest> dispatchRequestList = new ArrayList<>(); + while (tmpByteBuffer.hasRemaining()) { + DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false); + if (dispatchRequest.isSuccess()) { + dispatchRequestList.add(dispatchRequest); + } else { + LOGGER.error("[BUG]read total count not equals msg total size."); + } } + dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()])); + mappedPageHoldCount.getAndDecrement(); + } catch (Exception e) { + LOGGER.error("There is an exception in task execution.", e); } - dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()])); - mappedPageHoldCount.getAndDecrement(); }); batchDispatchRequestQueue.poll(); }
