This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 87075c2662 [ISSUE #6955] add removeOne method for ReceiptHandleGroup
(#6955)
87075c2662 is described below
commit 87075c26623c2c40486c4189e2fb1855426a8ae9
Author: lk <[email protected]>
AuthorDate: Wed Jun 28 15:26:39 2023 +0800
[ISSUE #6955] add removeOne method for ReceiptHandleGroup (#6955)
---
.../rocketmq/proxy/common/ReceiptHandleGroup.java | 36 ++++++++++++++++++++++
.../proxy/common/ReceiptHandleGroupTest.java | 32 +++++++++++++++++--
2 files changed, 66 insertions(+), 2 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index f257563952..6fee38d117 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.proxy.common;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
@@ -77,6 +78,22 @@ public class ReceiptHandleGroup {
.append("offset", offset)
.toString();
}
+
+ public String getOriginalHandle() {
+ return originalHandle;
+ }
+
+ public String getBroker() {
+ return broker;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
}
public static class HandleData {
@@ -100,6 +117,10 @@ public class ReceiptHandleGroup {
this.semaphore.release();
}
+ public MessageReceiptHandle getMessageReceiptHandle() {
+ return messageReceiptHandle;
+ }
+
@Override
public boolean equals(Object o) {
return this == o;
@@ -196,6 +217,21 @@ public class ReceiptHandleGroup {
return res.get();
}
+ public MessageReceiptHandle removeOne(String msgID) {
+ Map<HandleKey, HandleData> handleMap =
this.receiptHandleMap.get(msgID);
+ if (handleMap == null) {
+ return null;
+ }
+ Set<HandleKey> keys = handleMap.keySet();
+ for (HandleKey key : keys) {
+ MessageReceiptHandle res = this.remove(msgID, key.originalHandle);
+ if (res != null) {
+ return res;
+ }
+ }
+ return null;
+ }
+
public void computeIfPresent(String msgID, String handle,
Function<MessageReceiptHandle,
CompletableFuture<MessageReceiptHandle>> function) {
Map<HandleKey, HandleData> handleMap =
this.receiptHandleMap.get(msgID);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
index d3e8645eff..0a7e2f757d 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
@@ -173,8 +173,6 @@ public class ReceiptHandleGroupTest extends InitConfigTest {
assertTrue(receiptHandleGroup.isEmpty());
}
-
-
@Test
public void testRemoveWhenComputeIfPresent() {
String handle1 = createHandle();
@@ -281,6 +279,36 @@ public class ReceiptHandleGroupTest extends InitConfigTest
{
assertTrue(receiptHandleGroup.isEmpty());
}
+ @Test
+ public void testRemoveOne() {
+ String handle1 = createHandle();
+ AtomicReference<MessageReceiptHandle> removeHandleRef = new
AtomicReference<>();
+ AtomicInteger count = new AtomicInteger();
+
+ receiptHandleGroup.put(msgID, createMessageReceiptHandle(handle1,
msgID));
+ int threadNum = Math.max(Runtime.getRuntime().availableProcessors(),
3);
+ CountDownLatch latch = new CountDownLatch(threadNum);
+ for (int i = 0; i < threadNum; i++) {
+ Thread thread = new Thread(() -> {
+ try {
+ latch.countDown();
+ latch.await();
+ MessageReceiptHandle handle =
receiptHandleGroup.removeOne(msgID);
+ if (handle != null) {
+ removeHandleRef.set(handle);
+ count.incrementAndGet();
+ }
+ } catch (Exception ignored) {
+ }
+ });
+ thread.start();
+ }
+
+ await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
assertEquals(1, count.get()));
+ assertEquals(handle1, removeHandleRef.get().getReceiptHandleStr());
+ assertTrue(receiptHandleGroup.isEmpty());
+ }
+
private MessageReceiptHandle createMessageReceiptHandle(String handle,
String msgID) {
return new MessageReceiptHandle(GROUP, TOPIC, 0, handle, msgID, 0, 0);
}