This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 87075c2662 [ISSUE #6955] add removeOne method for ReceiptHandleGroup 
(#6955)
87075c2662 is described below

commit 87075c26623c2c40486c4189e2fb1855426a8ae9
Author: lk <[email protected]>
AuthorDate: Wed Jun 28 15:26:39 2023 +0800

    [ISSUE #6955] add removeOne method for ReceiptHandleGroup (#6955)
---
 .../rocketmq/proxy/common/ReceiptHandleGroup.java  | 36 ++++++++++++++++++++++
 .../proxy/common/ReceiptHandleGroupTest.java       | 32 +++++++++++++++++--
 2 files changed, 66 insertions(+), 2 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index f257563952..6fee38d117 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.proxy.common;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
@@ -77,6 +78,22 @@ public class ReceiptHandleGroup {
                 .append("offset", offset)
                 .toString();
         }
+
+        public String getOriginalHandle() {
+            return originalHandle;
+        }
+
+        public String getBroker() {
+            return broker;
+        }
+
+        public int getQueueId() {
+            return queueId;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
     }
 
     public static class HandleData {
@@ -100,6 +117,10 @@ public class ReceiptHandleGroup {
             this.semaphore.release();
         }
 
+        public MessageReceiptHandle getMessageReceiptHandle() {
+            return messageReceiptHandle;
+        }
+
         @Override
         public boolean equals(Object o) {
             return this == o;
@@ -196,6 +217,21 @@ public class ReceiptHandleGroup {
         return res.get();
     }
 
+    public MessageReceiptHandle removeOne(String msgID) {
+        Map<HandleKey, HandleData> handleMap = 
this.receiptHandleMap.get(msgID);
+        if (handleMap == null) {
+            return null;
+        }
+        Set<HandleKey> keys = handleMap.keySet();
+        for (HandleKey key : keys) {
+            MessageReceiptHandle res = this.remove(msgID, key.originalHandle);
+            if (res != null) {
+                return res;
+            }
+        }
+        return null;
+    }
+
     public void computeIfPresent(String msgID, String handle,
         Function<MessageReceiptHandle, 
CompletableFuture<MessageReceiptHandle>> function) {
         Map<HandleKey, HandleData> handleMap = 
this.receiptHandleMap.get(msgID);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
index d3e8645eff..0a7e2f757d 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
@@ -173,8 +173,6 @@ public class ReceiptHandleGroupTest extends InitConfigTest {
         assertTrue(receiptHandleGroup.isEmpty());
     }
 
-
-
     @Test
     public void testRemoveWhenComputeIfPresent() {
         String handle1 = createHandle();
@@ -281,6 +279,36 @@ public class ReceiptHandleGroupTest extends InitConfigTest 
{
         assertTrue(receiptHandleGroup.isEmpty());
     }
 
+    @Test
+    public void testRemoveOne() {
+        String handle1 = createHandle();
+        AtomicReference<MessageReceiptHandle> removeHandleRef = new 
AtomicReference<>();
+        AtomicInteger count = new AtomicInteger();
+
+        receiptHandleGroup.put(msgID, createMessageReceiptHandle(handle1, 
msgID));
+        int threadNum = Math.max(Runtime.getRuntime().availableProcessors(), 
3);
+        CountDownLatch latch = new CountDownLatch(threadNum);
+        for (int i = 0; i < threadNum; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    latch.countDown();
+                    latch.await();
+                    MessageReceiptHandle handle = 
receiptHandleGroup.removeOne(msgID);
+                    if (handle != null) {
+                        removeHandleRef.set(handle);
+                        count.incrementAndGet();
+                    }
+                } catch (Exception ignored) {
+                }
+            });
+            thread.start();
+        }
+
+        await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> 
assertEquals(1, count.get()));
+        assertEquals(handle1, removeHandleRef.get().getReceiptHandleStr());
+        assertTrue(receiptHandleGroup.isEmpty());
+    }
+
     private MessageReceiptHandle createMessageReceiptHandle(String handle, 
String msgID) {
         return new MessageReceiptHandle(GROUP, TOPIC, 0, handle, msgID, 0, 0);
     }

Reply via email to