This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 15c6889bb0 fix receive message activity attempt id not correct (#7012)
15c6889bb0 is described below
commit 15c6889bb0abd014c06ef1452f791db9daa1ea08
Author: lizhimins <[email protected]>
AuthorDate: Tue Jul 11 17:04:00 2023 +0800
fix receive message activity attempt id not correct (#7012)
fix receive message activity attempt id not correct
---
.../rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
.../rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index a504179a9e..cf58bb87a8 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -130,7 +130,7 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
subscriptionData,
fifo,
new PopMessageResultFilterImpl(maxAttempts),
- request.getAttemptId(),
+ request.hasAttemptId() ? request.getAttemptId() : null,
timeRemaining
).thenAccept(popResult -> {
if (proxyConfig.isEnableProxyAutoRenew() &&
request.getAutoRenew()) {
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 2e562504a4..7fd9a9ffdf 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -57,6 +57,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -89,7 +90,7 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
.setRequestTimeout(Durations.fromSeconds(3))
.build());
when(this.messagingProcessor.popMessage(any(), any(), anyString(),
anyString(), anyInt(), anyLong(),
- pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(),
anyString(), anyLong()))
+ pollTimeCaptor.capture(), anyInt(), any(), anyBoolean(), any(),
isNull(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(new
PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
@@ -223,7 +224,6 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
assertEquals(Code.ILLEGAL_INVISIBLE_TIME,
getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor.getAllValues()));
}
-
@Test
public void testReceiveMessage() {
StreamObserver<ReceiveMessageResponse> receiveStreamObserver =
mock(ServerCallStreamObserver.class);
@@ -245,7 +245,7 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
any(),
anyBoolean(),
any(),
- anyString(),
+ isNull(),
anyLong())).thenReturn(CompletableFuture.completedFuture(popResult));
this.receiveMessageActivity.receiveMessage(