This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 152a955ca4 [ISSUE #9069] Fix the IndexFile ConcurrentModificationException in tiered storage (#9071) 152a955ca4 is described below commit 152a955ca467482d2f04a614c2cf23c9a9778e69 Author: wangshaojie4039 <15001782...@163.com> AuthorDate: Wed Dec 25 17:02:06 2024 +0800 [ISSUE #9069] Fix the IndexFile ConcurrentModificationException in tiered storage (#9071) --- .../tieredstore/common/GroupCommitContext.java | 70 ++++++++++++ .../core/MessageStoreDispatcherImpl.java | 93 ++++++++++++--- .../tieredstore/file/FlatFileInterface.java | 5 +- .../rocketmq/tieredstore/file/FlatMessageFile.java | 42 ++++--- .../tieredstore/common/GroupCommitContextTest.java | 54 +++++++++ .../core/MessageStoreDispatcherImplTest.java | 125 +++++++++++++++++++++ .../tieredstore/file/FlatMessageFileTest.java | 8 ++ 7 files changed, 359 insertions(+), 38 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java new file mode 100644 index 0000000000..f677e7c934 --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.common; + +import java.util.List; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.SelectMappedBufferResult; + +public class GroupCommitContext { + + private long endOffset; + + private List<SelectMappedBufferResult> bufferList; + + private List<DispatchRequest> dispatchRequests; + + public long getEndOffset() { + return endOffset; + } + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } + + public List<SelectMappedBufferResult> getBufferList() { + return bufferList; + } + + public void setBufferList(List<SelectMappedBufferResult> bufferList) { + this.bufferList = bufferList; + } + + public List<DispatchRequest> getDispatchRequests() { + return dispatchRequests; + } + + public void setDispatchRequests(List<DispatchRequest> dispatchRequests) { + this.dispatchRequests = dispatchRequests; + } + + public void release() { + if (bufferList != null) { + for (SelectMappedBufferResult bufferResult : bufferList) { + bufferResult.release(); + } + bufferList.clear(); + bufferList = null; + } + if (dispatchRequests != null) { + dispatchRequests.clear(); + dispatchRequests = null; + } + + } +} diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java index 9b1e53564d..bcc4e225da 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java @@ -16,15 +16,20 @@ */ package org.apache.rocketmq.tieredstore.core; +import com.google.common.annotations.VisibleForTesting; import io.opentelemetry.api.common.Attributes; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -42,6 +47,7 @@ import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.TieredMessageStore; import org.apache.rocketmq.tieredstore.common.AppendResult; import org.apache.rocketmq.tieredstore.common.FileSegmentType; +import org.apache.rocketmq.tieredstore.common.GroupCommitContext; import org.apache.rocketmq.tieredstore.file.FlatFileInterface; import org.apache.rocketmq.tieredstore.file.FlatFileStore; import org.apache.rocketmq.tieredstore.index.IndexService; @@ -65,6 +71,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message protected final MessageStoreFilter topicFilter; protected final Semaphore semaphore; protected final IndexService indexService; + protected final Map<FlatFileInterface, GroupCommitContext> failedGroupCommitMap; public MessageStoreDispatcherImpl(TieredMessageStore messageStore) { this.messageStore = messageStore; @@ -77,6 +84,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message this.flatFileStore = messageStore.getFlatFileStore(); this.storeExecutor = messageStore.getStoreExecutor(); this.indexService = messageStore.getIndexService(); + this.failedGroupCommitMap = new ConcurrentHashMap<>(); } @Override @@ -84,6 +92,11 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message return MessageStoreDispatcher.class.getSimpleName(); } + @VisibleForTesting + public Map<FlatFileInterface, GroupCommitContext> getFailedGroupCommitMap() { + return failedGroupCommitMap; + } + public void dispatchWithSemaphore(FlatFileInterface flatFile) { try { if (stopped) { @@ -153,10 +166,22 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message // If the previous commit fails, attempt to trigger a commit directly. if (commitOffset < currentOffset) { - this.commitAsync(flatFile); + this.commitAsync(flatFile).whenComplete((result, throwable) -> { + if (throwable != null) { + log.error("MessageDispatcher#flatFile commitOffset less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", topic, queueId, throwable); + } + }); return CompletableFuture.completedFuture(false); } + if (failedGroupCommitMap.containsKey(flatFile)) { + GroupCommitContext failedCommit = failedGroupCommitMap.get(flatFile); + if (failedCommit.getEndOffset() <= commitOffset) { + failedGroupCommitMap.remove(flatFile); + constructIndexFile(flatFile.getTopicId(), failedCommit); + } + } + if (currentOffset < minOffsetInQueue) { log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); @@ -224,6 +249,8 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message } long offset = currentOffset; + List<SelectMappedBufferResult> appendingBufferList = new ArrayList<>(); + List<DispatchRequest> dispatchRequestList = new ArrayList<>(); for (; offset < targetOffset; offset++) { cqUnit = consumeQueue.get(offset); bufferSize += cqUnit.getSize(); @@ -231,6 +258,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message break; } message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize()); + appendingBufferList.add(message); ByteBuffer byteBuffer = message.getByteBuffer(); AppendResult result = flatFile.appendCommitLog(message); @@ -251,13 +279,20 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message result = flatFile.appendConsumeQueue(dispatchRequest); if (!AppendResult.SUCCESS.equals(result)) { break; + } else { + dispatchRequestList.add(dispatchRequest); } } + GroupCommitContext groupCommitContext = new GroupCommitContext(); + groupCommitContext.setEndOffset(offset); + groupCommitContext.setBufferList(appendingBufferList); + groupCommitContext.setDispatchRequests(dispatchRequestList); + // If there are many messages waiting to be uploaded, call the upload logic immediately. boolean repeat = timeout || maxOffsetInQueue - offset > storeConfig.getTieredStoreGroupCommitCount(); - if (!flatFile.getDispatchRequestList().isEmpty()) { + if (!dispatchRequestList.isEmpty()) { Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder() .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic) .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId) @@ -265,8 +300,19 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message .build(); TieredStoreMetricsManager.messagesDispatchTotal.add(offset - currentOffset, attributes); - this.commitAsync(flatFile).whenComplete((unused, throwable) -> { - if (repeat) { + this.commitAsync(flatFile).whenComplete((success, throwable) -> { + if (success) { + constructIndexFile(flatFile.getTopicId(), groupCommitContext); + } + else { + //next commit async,execute constructIndexFile. + GroupCommitContext oldCommit = failedGroupCommitMap.put(flatFile, groupCommitContext); + if (oldCommit != null) { + log.warn("MessageDispatcher#commitAsync failed,flatFile old failed commit context not release, topic={}, queueId={} ", topic, queueId); + oldCommit.release(); + } + } + if (success && repeat) { storeExecutor.commonExecutor.submit(() -> dispatchWithSemaphore(flatFile)); } } @@ -282,22 +328,28 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message return CompletableFuture.completedFuture(false); } - public CompletableFuture<Void> commitAsync(FlatFileInterface flatFile) { - return flatFile.commitAsync().thenAcceptAsync(success -> { - if (success) { - if (storeConfig.isMessageIndexEnable()) { - flatFile.getDispatchRequestList().forEach( - request -> constructIndexFile(flatFile.getTopicId(), request)); + public CompletableFuture<Boolean> commitAsync(FlatFileInterface flatFile) { + return flatFile.commitAsync(); + } + + public void constructIndexFile(long topicId, GroupCommitContext groupCommitContext) { + MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> { + if (storeConfig.isMessageIndexEnable()) { + try { + groupCommitContext.getDispatchRequests().forEach(request -> constructIndexFile0(topicId, request)); + } + catch (Throwable e) { + log.error("constructIndexFile error {}", topicId, e); } - flatFile.release(); } - }, storeExecutor.bufferCommitExecutor); + groupCommitContext.release(); + }); } /** * Building indexes with offsetId is no longer supported because offsetId has changed in tiered storage */ - public void constructIndexFile(long topicId, DispatchRequest request) { + public void constructIndexFile0(long topicId, DispatchRequest request) { Set<String> keySet = new HashSet<>(); if (StringUtils.isNotBlank(request.getUniqKey())) { keySet.add(request.getUniqKey()); @@ -309,12 +361,27 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message request.getCommitLogOffset(), request.getMsgSize(), request.getStoreTimestamp()); } + public void releaseClosedPendingGroupCommit() { + Iterator<Map.Entry<FlatFileInterface, GroupCommitContext>> iterator = failedGroupCommitMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<FlatFileInterface, GroupCommitContext> entry = iterator.next(); + if (entry.getKey().isClosed()) { + entry.getValue().release(); + iterator.remove(); + } + } + } + + @Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); + + releaseClosedPendingGroupCommit(); + this.waitForRunning(Duration.ofSeconds(20).toMillis()); } catch (Throwable t) { log.error("MessageStore dispatch error", t); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java index 619470fbc2..01e7f25a46 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.tieredstore.file; import java.nio.ByteBuffer; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; import org.apache.rocketmq.common.BoundaryType; @@ -58,8 +57,6 @@ public interface FlatFileInterface { */ AppendResult appendConsumeQueue(DispatchRequest request); - List<DispatchRequest> getDispatchRequestList(); - void release(); long getMinStoreTimestamp(); @@ -143,6 +140,8 @@ public interface FlatFileInterface { */ CompletableFuture<Long> getQueueOffsetByTimeAsync(long timestamp, BoundaryType boundaryType); + boolean isClosed(); + /** * Shutdown process */ diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java index d5675976cb..4510a8a127 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java @@ -17,12 +17,14 @@ package org.apache.rocketmq.tieredstore.file; import com.alibaba.fastjson.JSON; +import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -51,14 +53,13 @@ public class FlatMessageFile implements FlatFileInterface { protected final String filePath; protected final ReentrantLock fileLock; + protected final Semaphore commitLock = new Semaphore(1); protected final MessageStoreConfig storeConfig; protected final MetadataStore metadataStore; protected final FlatCommitLogFile commitLog; protected final FlatConsumeQueueFile consumeQueue; protected final AtomicLong lastDestroyTime; - protected final List<SelectMappedBufferResult> bufferResultList; - protected final List<DispatchRequest> dispatchRequestList; protected final ConcurrentMap<String, CompletableFuture<?>> inFlightRequestMap; public FlatMessageFile(FlatFileFactory fileFactory, String topic, int queueId) { @@ -76,8 +77,6 @@ public class FlatMessageFile implements FlatFileInterface { this.commitLog = fileFactory.createFlatFileForCommitLog(filePath); this.consumeQueue = fileFactory.createFlatFileForConsumeQueue(filePath); this.lastDestroyTime = new AtomicLong(); - this.bufferResultList = new ArrayList<>(); - this.dispatchRequestList = new ArrayList<>(); this.inFlightRequestMap = new ConcurrentHashMap<>(); } @@ -127,6 +126,11 @@ public class FlatMessageFile implements FlatFileInterface { return this.fileLock; } + @VisibleForTesting + public Semaphore getCommitLock() { + return commitLock; + } + @Override public boolean rollingFile(long interval) { return this.commitLog.tryRollingFile(interval); @@ -156,7 +160,6 @@ public class FlatMessageFile implements FlatFileInterface { if (closed) { return AppendResult.FILE_CLOSED; } - this.bufferResultList.add(message); return this.appendCommitLog(message.getByteBuffer()); } @@ -172,29 +175,14 @@ public class FlatMessageFile implements FlatFileInterface { buffer.putLong(request.getTagsCode()); buffer.flip(); - this.dispatchRequestList.add(request); return consumeQueue.append(buffer, request.getStoreTimestamp()); } - @Override - public List<DispatchRequest> getDispatchRequestList() { - return dispatchRequestList; - } + @Override public void release() { - for (SelectMappedBufferResult bufferResult : bufferResultList) { - bufferResult.release(); - } - - if (queueMetadata != null) { - log.trace("FlatMessageFile release, topic={}, queueId={}, bufferSize={}, requestListSize={}", - queueMetadata.getQueue().getTopic(), queueMetadata.getQueue().getQueueId(), - bufferResultList.size(), dispatchRequestList.size()); - } - bufferResultList.clear(); - dispatchRequestList.clear(); } @Override @@ -246,13 +234,18 @@ public class FlatMessageFile implements FlatFileInterface { @Override public CompletableFuture<Boolean> commitAsync() { + // acquire lock + if (commitLock.drainPermits() <= 0) { + return CompletableFuture.completedFuture(false); + } + return this.commitLog.commitAsync() .thenCompose(result -> { if (result) { return consumeQueue.commitAsync(); } return CompletableFuture.completedFuture(false); - }); + }).whenComplete((result, throwable) -> commitLock.release()); } @Override @@ -363,6 +356,11 @@ public class FlatMessageFile implements FlatFileInterface { return StringUtils.equals(filePath, ((FlatMessageFile) obj).filePath); } + @Override + public boolean isClosed() { + return closed; + } + @Override public void shutdown() { closed = true; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java new file mode 100644 index 0000000000..e692360761 --- /dev/null +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tieredstore.common; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.junit.Assert; +import org.junit.Test; + +public class GroupCommitContextTest { + + @Test + public void groupCommitContextTest() { + GroupCommitContext releaseGroupCommitContext = new GroupCommitContext(); + releaseGroupCommitContext.release(); + + long endOffset = 1000; + List<DispatchRequest> dispatchRequestList = new ArrayList<>(); + dispatchRequestList.add(new DispatchRequest(1000)); + List<SelectMappedBufferResult> selectMappedBufferResultList = new ArrayList<>(); + selectMappedBufferResultList.add(new SelectMappedBufferResult(100, ByteBuffer.allocate(10), 1000, null)); + GroupCommitContext groupCommitContext = new GroupCommitContext(); + groupCommitContext.setEndOffset(endOffset); + groupCommitContext.setBufferList(selectMappedBufferResultList); + groupCommitContext.setDispatchRequests(dispatchRequestList); + + Assert.assertTrue(groupCommitContext.getEndOffset() == endOffset); + Assert.assertTrue(groupCommitContext.getBufferList().equals(selectMappedBufferResultList)); + Assert.assertTrue(groupCommitContext.getDispatchRequests().equals(dispatchRequestList)); + groupCommitContext.release(); + Assert.assertTrue(groupCommitContext.getDispatchRequests() == null); + Assert.assertTrue(groupCommitContext.getBufferList() == null); + Assert.assertTrue(dispatchRequestList.isEmpty()); + Assert.assertTrue(selectMappedBufferResultList.isEmpty()); + } + +} diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java index 92e989e596..6b96076948 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.tieredstore.MessageStoreConfig; import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.TieredMessageStore; +import org.apache.rocketmq.tieredstore.common.GroupCommitContext; import org.apache.rocketmq.tieredstore.file.FlatFileFactory; import org.apache.rocketmq.tieredstore.file.FlatFileStore; import org.apache.rocketmq.tieredstore.file.FlatMessageFile; @@ -157,6 +158,130 @@ public class MessageStoreDispatcherImplTest { Assert.assertEquals(200L, flatFile.getConsumeQueueCommitOffset()); } + @Test + public void dispatchCommitFailedTest() throws Exception { + MessageStore defaultStore = Mockito.mock(MessageStore.class); + Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(100L); + Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(200L); + + messageStore = Mockito.mock(TieredMessageStore.class); + IndexService indexService = + new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath); + indexService.start(); + Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore); + Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig); + Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor); + Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore); + Mockito.when(messageStore.getIndexService()).thenReturn(indexService); + + // mock message + ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer(); + MessageExt messageExt = MessageDecoder.decode(buffer); + messageExt.setKeys("Key"); + MessageAccessor.putProperty( + messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "uk"); + messageExt.setBody(new byte[10]); + messageExt.setStoreSize(0); + buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false)); + buffer.putInt(0, buffer.remaining()); + + DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), + MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(), 0L, + MessageFormatUtil.getStoreTimeStamp(buffer), 0L, + "", "", 0, 0L, new HashMap<>()); + + // construct flat file + MessageStoreDispatcher dispatcher = new MessageStoreDispatcherImpl(messageStore); + dispatcher.dispatch(request); + FlatMessageFile flatFile = fileStore.getFlatFile(mq); + Assert.assertNotNull(flatFile); + + // init offset + dispatcher.doScheduleDispatch(flatFile, true).join(); + Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset()); + Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset()); + Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset()); + + ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class); + Mockito.when(defaultStore.getConsumeQueue(anyString(), anyInt())).thenReturn(cq); + Mockito.when(cq.get(anyLong())).thenReturn( + new CqUnit(100, 1000, buffer.remaining(), 0L)); + Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(), anyInt())).thenReturn( + new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(), buffer.remaining(), null)); + flatFile.getCommitLock().drainPermits(); + dispatcher.doScheduleDispatch(flatFile, true).join(); + GroupCommitContext groupCommitContext = ((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile); + Assert.assertTrue(groupCommitContext != null); + Assert.assertTrue(groupCommitContext.getEndOffset() == 200); + flatFile.getCommitLock().release(); + flatFile.commitAsync().join(); + dispatcher.doScheduleDispatch(flatFile, true).join(); + Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile) == null); + ((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq); + ((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit(); + + } + + @Test + public void dispatchFailedGroupCommitMapReleaseTest() throws Exception { + MessageStore defaultStore = Mockito.mock(MessageStore.class); + Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(100L); + Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(200L); + + messageStore = Mockito.mock(TieredMessageStore.class); + IndexService indexService = + new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath); + indexService.start(); + Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore); + Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig); + Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor); + Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore); + Mockito.when(messageStore.getIndexService()).thenReturn(indexService); + + // mock message + ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer(); + MessageExt messageExt = MessageDecoder.decode(buffer); + messageExt.setKeys("Key"); + MessageAccessor.putProperty( + messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "uk"); + messageExt.setBody(new byte[10]); + messageExt.setStoreSize(0); + buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false)); + buffer.putInt(0, buffer.remaining()); + + DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), + MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(), 0L, + MessageFormatUtil.getStoreTimeStamp(buffer), 0L, + "", "", 0, 0L, new HashMap<>()); + + // construct flat file + MessageStoreDispatcher dispatcher = new MessageStoreDispatcherImpl(messageStore); + dispatcher.dispatch(request); + FlatMessageFile flatFile = fileStore.getFlatFile(mq); + Assert.assertNotNull(flatFile); + + // init offset + dispatcher.doScheduleDispatch(flatFile, true).join(); + Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset()); + Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset()); + Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset()); + + ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class); + Mockito.when(defaultStore.getConsumeQueue(anyString(), anyInt())).thenReturn(cq); + Mockito.when(cq.get(anyLong())).thenReturn( + new CqUnit(100, 1000, buffer.remaining(), 0L)); + Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(), anyInt())).thenReturn( + new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(), buffer.remaining(), null)); + flatFile.getCommitLock().drainPermits(); + dispatcher.doScheduleDispatch(flatFile, true).join(); + GroupCommitContext groupCommitContext = ((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile); + Assert.assertTrue(groupCommitContext != null); + ((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq); + ((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit(); + Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile) == null); + + } + @Test public void dispatchServiceTest() { MessageStore defaultStore = Mockito.mock(MessageStore.class); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java index 8a417f54a7..8208d27741 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java @@ -216,4 +216,12 @@ public class FlatMessageFileTest { flatFile.destroy(); } + + @Test + public void testCommitLock() { + String topic = "CommitLogTest"; + FlatMessageFile flatFile = new FlatMessageFile(flatFileFactory, topic, 0); + flatFile.getCommitLock().drainPermits(); + Assert.assertFalse(flatFile.commitAsync().join()); + } }