This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6cccff82f4 [ISSUE #9187] The request should be rejected if the 
queueOffset equals maxOffset when changing the invisible time (#9186)
6cccff82f4 is described below

commit 6cccff82f42d7c8326774c88334b4616b0a46e5e
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Wed Feb 19 17:24:50 2025 +0800

    [ISSUE #9187] The request should be rejected if the queueOffset equals 
maxOffset when changing the invisible time (#9186)
    
    * When changing the invisible time, the request should be rejected if the 
queueOffset equals maxOffset
    
    * Add UTs
    
    * Make UTs to pass
    
    * Remove unused imports
---
 .../processor/ChangeInvisibleTimeProcessor.java    |  2 +-
 .../ChangeInvisibleTimeProcessorTest.java          | 35 ++++++++++++++++++++--
 2 files changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index a7180f6654..de72ee7baf 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -130,7 +130,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         } catch (ConsumeQueueException e) {
             throw new RemotingCommandException("Failed to get max consume 
offset", e);
         }
-        if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() 
> maxOffset) {
+        if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() 
>= maxOffset) {
             response.setCode(ResponseCode.NO_MESSAGE);
             return CompletableFuture.completedFuture(response);
         }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index a7aae7ee3d..e15d51b4a8 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -29,8 +29,6 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -46,6 +44,7 @@ import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -56,6 +55,8 @@ import org.mockito.junit.MockitoJUnitRunner;
 import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -108,7 +109,8 @@ public class ChangeInvisibleTimeProcessorTest {
     }
 
     @Test
-    public void testProcessRequest_Success() throws RemotingCommandException, 
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
+    public void testProcessRequest_Success() throws RemotingCommandException, 
ConsumeQueueException {
+        when(messageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(2L);
         
when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new
 PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK))));
         int queueId = 0;
         long queueOffset = 0;
@@ -133,4 +135,31 @@ public class ChangeInvisibleTimeProcessorTest {
         assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
         
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
     }
+
+    @Test
+    public void testProcessRequest_NoMessage() throws 
RemotingCommandException, ConsumeQueueException {
+        when(messageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(2L);
+        int queueId = 0;
+        long queueOffset = 2;
+        long popTime = System.currentTimeMillis() - 1_000;
+        long invisibleTime = 30_000;
+        int reviveQid = 0;
+        String brokerName = "test_broker";
+        String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, 
invisibleTime, reviveQid,
+            topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + 
queueOffset;
+
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setOffset(queueOffset);
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setExtraInfo(extraInfo);
+        requestHeader.setInvisibleTime(invisibleTime);
+
+        final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand responseToReturn = 
changeInvisibleTimeProcessor.processRequest(handlerContext, request);
+        
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+        
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+    }
 }

Reply via email to