This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/main by this push:
new 8352c15 bugfix:fix rmq sub concurrent issue
8352c15 is described below
commit 8352c15acf26b905a9807b9a0aaa777d1bb3a9d8
Author: changfeng <[email protected]>
AuthorDate: Mon Jun 26 14:36:25 2023 +0800
bugfix:fix rmq sub concurrent issue
---
.../adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index bd081c5..db4633e 100644
---
a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++
b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -61,6 +61,7 @@ import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
@@ -125,7 +126,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
logger.trace("consumer poll message empty.");
return null;
}
- List<ConnectRecord> connectRecords = Lists.newArrayList();
+ List<ConnectRecord> connectRecords = new CopyOnWriteArrayList<>();
List<CompletableFuture<Void>> completableFutures =
Lists.newArrayList();
messages.forEach(item->{
CompletableFuture<Void> recordCompletableFuture =
CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))