This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/main by this push:
     new 8352c15  bugfix:fix rmq sub concurrent issue
8352c15 is described below

commit 8352c15acf26b905a9807b9a0aaa777d1bb3a9d8
Author: changfeng <[email protected]>
AuthorDate: Mon Jun 26 14:36:25 2023 +0800

    bugfix:fix rmq sub concurrent issue
---
 .../adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index bd081c5..db4633e 100644
--- 
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ 
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -61,6 +61,7 @@ import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
@@ -125,7 +126,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
             logger.trace("consumer poll message empty.");
             return null;
         }
-        List<ConnectRecord> connectRecords = Lists.newArrayList();
+        List<ConnectRecord> connectRecords = new CopyOnWriteArrayList<>();
         List<CompletableFuture<Void>> completableFutures = 
Lists.newArrayList();
         messages.forEach(item->{
             CompletableFuture<Void> recordCompletableFuture = 
CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))

Reply via email to