This is an automated email from the ASF dual-hosted git repository. ltamber pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 6cccff82f4 [ISSUE #9187] The request should be rejected if the queueOffset equals maxOffset when changing the invisible time (#9186) 6cccff82f4 is described below commit 6cccff82f42d7c8326774c88334b4616b0a46e5e Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Wed Feb 19 17:24:50 2025 +0800 [ISSUE #9187] The request should be rejected if the queueOffset equals maxOffset when changing the invisible time (#9186) * When changing the invisible time, the request should be rejected if the queueOffset equals maxOffset * Add UTs * Make UTs to pass * Remove unused imports --- .../processor/ChangeInvisibleTimeProcessor.java | 2 +- .../ChangeInvisibleTimeProcessorTest.java | 35 ++++++++++++++++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index a7180f6654..de72ee7baf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -130,7 +130,7 @@ public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor { } catch (ConsumeQueueException e) { throw new RemotingCommandException("Failed to get max consume offset", e); } - if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() > maxOffset) { + if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() >= maxOffset) { response.setCode(ResponseCode.NO_MESSAGE); return CompletableFuture.completedFuture(response); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java index a7aae7ee3d..e15d51b4a8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java @@ -29,8 +29,6 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.protocol.LanguageCode; @@ -46,6 +44,7 @@ import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -56,6 +55,8 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -108,7 +109,8 @@ public class ChangeInvisibleTimeProcessorTest { } @Test - public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException { + public void testProcessRequest_Success() throws RemotingCommandException, ConsumeQueueException { + when(messageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(2L); when(escapeBridge.asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)))); int queueId = 0; long queueOffset = 0; @@ -133,4 +135,31 @@ public class ChangeInvisibleTimeProcessorTest { assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque()); } + + @Test + public void testProcessRequest_NoMessage() throws RemotingCommandException, ConsumeQueueException { + when(messageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(2L); + int queueId = 0; + long queueOffset = 2; + long popTime = System.currentTimeMillis() - 1_000; + long invisibleTime = 30_000; + int reviveQid = 0; + String brokerName = "test_broker"; + String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, invisibleTime, reviveQid, + topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + queueOffset; + + ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setQueueId(queueId); + requestHeader.setOffset(queueOffset); + requestHeader.setConsumerGroup(group); + requestHeader.setExtraInfo(extraInfo); + requestHeader.setInvisibleTime(invisibleTime); + + final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand responseToReturn = changeInvisibleTimeProcessor.processRequest(handlerContext, request); + assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); + assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque()); + } }