This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 152a955ca4 [ISSUE #9069] Fix the IndexFile 
ConcurrentModificationException in tiered storage (#9071)
152a955ca4 is described below

commit 152a955ca467482d2f04a614c2cf23c9a9778e69
Author: wangshaojie4039 <15001782...@163.com>
AuthorDate: Wed Dec 25 17:02:06 2024 +0800

    [ISSUE #9069] Fix the IndexFile ConcurrentModificationException in tiered 
storage (#9071)
---
 .../tieredstore/common/GroupCommitContext.java     |  70 ++++++++++++
 .../core/MessageStoreDispatcherImpl.java           |  93 ++++++++++++---
 .../tieredstore/file/FlatFileInterface.java        |   5 +-
 .../rocketmq/tieredstore/file/FlatMessageFile.java |  42 ++++---
 .../tieredstore/common/GroupCommitContextTest.java |  54 +++++++++
 .../core/MessageStoreDispatcherImplTest.java       | 125 +++++++++++++++++++++
 .../tieredstore/file/FlatMessageFileTest.java      |   8 ++
 7 files changed, 359 insertions(+), 38 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java
new file mode 100644
index 0000000000..f677e7c934
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GroupCommitContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.common;
+
+import java.util.List;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+
+public class GroupCommitContext {
+
+    private long endOffset;
+
+    private List<SelectMappedBufferResult> bufferList;
+
+    private List<DispatchRequest> dispatchRequests;
+
+    public long getEndOffset() {
+        return endOffset;
+    }
+
+    public void setEndOffset(long endOffset) {
+        this.endOffset = endOffset;
+    }
+
+    public List<SelectMappedBufferResult> getBufferList() {
+        return bufferList;
+    }
+
+    public void setBufferList(List<SelectMappedBufferResult> bufferList) {
+        this.bufferList = bufferList;
+    }
+
+    public List<DispatchRequest> getDispatchRequests() {
+        return dispatchRequests;
+    }
+
+    public void setDispatchRequests(List<DispatchRequest> dispatchRequests) {
+        this.dispatchRequests = dispatchRequests;
+    }
+
+    public void release() {
+        if (bufferList != null) {
+            for (SelectMappedBufferResult bufferResult : bufferList) {
+                bufferResult.release();
+            }
+            bufferList.clear();
+            bufferList = null;
+        }
+        if (dispatchRequests != null) {
+            dispatchRequests.clear();
+            dispatchRequests = null;
+        }
+
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 9b1e53564d..bcc4e225da 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -16,15 +16,20 @@
  */
 package org.apache.rocketmq.tieredstore.core;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
@@ -42,6 +47,7 @@ import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.TieredMessageStore;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
+import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
 import org.apache.rocketmq.tieredstore.file.FlatFileInterface;
 import org.apache.rocketmq.tieredstore.file.FlatFileStore;
 import org.apache.rocketmq.tieredstore.index.IndexService;
@@ -65,6 +71,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread 
implements Message
     protected final MessageStoreFilter topicFilter;
     protected final Semaphore semaphore;
     protected final IndexService indexService;
+    protected final Map<FlatFileInterface, GroupCommitContext> 
failedGroupCommitMap;
 
     public MessageStoreDispatcherImpl(TieredMessageStore messageStore) {
         this.messageStore = messageStore;
@@ -77,6 +84,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread 
implements Message
         this.flatFileStore = messageStore.getFlatFileStore();
         this.storeExecutor = messageStore.getStoreExecutor();
         this.indexService = messageStore.getIndexService();
+        this.failedGroupCommitMap = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -84,6 +92,11 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
         return MessageStoreDispatcher.class.getSimpleName();
     }
 
+    @VisibleForTesting
+    public Map<FlatFileInterface, GroupCommitContext> 
getFailedGroupCommitMap() {
+        return failedGroupCommitMap;
+    }
+
     public void dispatchWithSemaphore(FlatFileInterface flatFile) {
         try {
             if (stopped) {
@@ -153,10 +166,22 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
 
             // If the previous commit fails, attempt to trigger a commit 
directly.
             if (commitOffset < currentOffset) {
-                this.commitAsync(flatFile);
+                this.commitAsync(flatFile).whenComplete((result, throwable) -> 
{
+                    if (throwable != null) {
+                        log.error("MessageDispatcher#flatFile commitOffset 
less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", 
topic, queueId, throwable);
+                    }
+                });
                 return CompletableFuture.completedFuture(false);
             }
 
+            if (failedGroupCommitMap.containsKey(flatFile)) {
+                GroupCommitContext failedCommit = 
failedGroupCommitMap.get(flatFile);
+                if (failedCommit.getEndOffset() <= commitOffset) {
+                    failedGroupCommitMap.remove(flatFile);
+                    constructIndexFile(flatFile.getTopicId(), failedCommit);
+                }
+            }
+
             if (currentOffset < minOffsetInQueue) {
                 log.warn("MessageDispatcher#dispatch, current offset is too 
small, topic={}, queueId={}, offset={}-{}, current={}",
                     topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset);
@@ -224,6 +249,8 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
             }
 
             long offset = currentOffset;
+            List<SelectMappedBufferResult> appendingBufferList = new 
ArrayList<>();
+            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
             for (; offset < targetOffset; offset++) {
                 cqUnit = consumeQueue.get(offset);
                 bufferSize += cqUnit.getSize();
@@ -231,6 +258,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                     break;
                 }
                 message = 
defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
+                appendingBufferList.add(message);
 
                 ByteBuffer byteBuffer = message.getByteBuffer();
                 AppendResult result = flatFile.appendCommitLog(message);
@@ -251,13 +279,20 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                 result = flatFile.appendConsumeQueue(dispatchRequest);
                 if (!AppendResult.SUCCESS.equals(result)) {
                     break;
+                } else {
+                    dispatchRequestList.add(dispatchRequest);
                 }
             }
 
+            GroupCommitContext groupCommitContext = new GroupCommitContext();
+            groupCommitContext.setEndOffset(offset);
+            groupCommitContext.setBufferList(appendingBufferList);
+            groupCommitContext.setDispatchRequests(dispatchRequestList);
+
             // If there are many messages waiting to be uploaded, call the 
upload logic immediately.
             boolean repeat = timeout || maxOffsetInQueue - offset > 
storeConfig.getTieredStoreGroupCommitCount();
 
-            if (!flatFile.getDispatchRequestList().isEmpty()) {
+            if (!dispatchRequestList.isEmpty()) {
                 Attributes attributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                     .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
                     .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId)
@@ -265,8 +300,19 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                     .build();
                 TieredStoreMetricsManager.messagesDispatchTotal.add(offset - 
currentOffset, attributes);
 
-                this.commitAsync(flatFile).whenComplete((unused, throwable) -> 
{
-                        if (repeat) {
+                this.commitAsync(flatFile).whenComplete((success, throwable) 
-> {
+                        if (success) {
+                            constructIndexFile(flatFile.getTopicId(), 
groupCommitContext);
+                        }
+                        else {
+                            //next commit async,execute constructIndexFile.
+                            GroupCommitContext oldCommit = 
failedGroupCommitMap.put(flatFile, groupCommitContext);
+                            if (oldCommit != null) {
+                                log.warn("MessageDispatcher#commitAsync 
failed,flatFile old failed commit context not release, topic={}, queueId={}  ", 
topic, queueId);
+                                oldCommit.release();
+                            }
+                        }
+                        if (success && repeat) {
                             storeExecutor.commonExecutor.submit(() -> 
dispatchWithSemaphore(flatFile));
                         }
                     }
@@ -282,22 +328,28 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
         return CompletableFuture.completedFuture(false);
     }
 
-    public CompletableFuture<Void> commitAsync(FlatFileInterface flatFile) {
-        return flatFile.commitAsync().thenAcceptAsync(success -> {
-            if (success) {
-                if (storeConfig.isMessageIndexEnable()) {
-                    flatFile.getDispatchRequestList().forEach(
-                        request -> constructIndexFile(flatFile.getTopicId(), 
request));
+    public CompletableFuture<Boolean> commitAsync(FlatFileInterface flatFile) {
+        return flatFile.commitAsync();
+    }
+
+    public void constructIndexFile(long topicId, GroupCommitContext 
groupCommitContext) {
+        MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
+            if (storeConfig.isMessageIndexEnable()) {
+                try {
+                    groupCommitContext.getDispatchRequests().forEach(request 
-> constructIndexFile0(topicId, request));
+                }
+                catch (Throwable e) {
+                    log.error("constructIndexFile error {}", topicId, e);
                 }
-                flatFile.release();
             }
-        }, storeExecutor.bufferCommitExecutor);
+            groupCommitContext.release();
+        });
     }
 
     /**
      * Building indexes with offsetId is no longer supported because offsetId 
has changed in tiered storage
      */
-    public void constructIndexFile(long topicId, DispatchRequest request) {
+    public void constructIndexFile0(long topicId, DispatchRequest request) {
         Set<String> keySet = new HashSet<>();
         if (StringUtils.isNotBlank(request.getUniqKey())) {
             keySet.add(request.getUniqKey());
@@ -309,12 +361,27 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
             request.getCommitLogOffset(), request.getMsgSize(), 
request.getStoreTimestamp());
     }
 
+    public void releaseClosedPendingGroupCommit() {
+        Iterator<Map.Entry<FlatFileInterface, GroupCommitContext>> iterator = 
failedGroupCommitMap.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<FlatFileInterface, GroupCommitContext> entry = 
iterator.next();
+            if (entry.getKey().isClosed()) {
+                entry.getValue().release();
+                iterator.remove();
+            }
+        }
+    }
+
+
     @Override
     public void run() {
         log.info("{} service started", this.getServiceName());
         while (!this.isStopped()) {
             try {
                 
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
+
+                releaseClosedPendingGroupCommit();
+
                 this.waitForRunning(Duration.ofSeconds(20).toMillis());
             } catch (Throwable t) {
                 log.error("MessageStore dispatch error", t);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
index 619470fbc2..01e7f25a46 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileInterface.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.tieredstore.file;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.Lock;
 import org.apache.rocketmq.common.BoundaryType;
@@ -58,8 +57,6 @@ public interface FlatFileInterface {
      */
     AppendResult appendConsumeQueue(DispatchRequest request);
 
-    List<DispatchRequest> getDispatchRequestList();
-
     void release();
 
     long getMinStoreTimestamp();
@@ -143,6 +140,8 @@ public interface FlatFileInterface {
      */
     CompletableFuture<Long> getQueueOffsetByTimeAsync(long timestamp, 
BoundaryType boundaryType);
 
+    boolean isClosed();
+
     /**
      * Shutdown process
      */
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index d5675976cb..4510a8a127 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -17,12 +17,14 @@
 package org.apache.rocketmq.tieredstore.file;
 
 import com.alibaba.fastjson.JSON;
+import com.google.common.annotations.VisibleForTesting;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -51,14 +53,13 @@ public class FlatMessageFile implements FlatFileInterface {
 
     protected final String filePath;
     protected final ReentrantLock fileLock;
+    protected final Semaphore commitLock = new Semaphore(1);
     protected final MessageStoreConfig storeConfig;
     protected final MetadataStore metadataStore;
     protected final FlatCommitLogFile commitLog;
     protected final FlatConsumeQueueFile consumeQueue;
     protected final AtomicLong lastDestroyTime;
 
-    protected final List<SelectMappedBufferResult> bufferResultList;
-    protected final List<DispatchRequest> dispatchRequestList;
     protected final ConcurrentMap<String, CompletableFuture<?>> 
inFlightRequestMap;
 
     public FlatMessageFile(FlatFileFactory fileFactory, String topic, int 
queueId) {
@@ -76,8 +77,6 @@ public class FlatMessageFile implements FlatFileInterface {
         this.commitLog = fileFactory.createFlatFileForCommitLog(filePath);
         this.consumeQueue = 
fileFactory.createFlatFileForConsumeQueue(filePath);
         this.lastDestroyTime = new AtomicLong();
-        this.bufferResultList = new ArrayList<>();
-        this.dispatchRequestList = new ArrayList<>();
         this.inFlightRequestMap = new ConcurrentHashMap<>();
     }
 
@@ -127,6 +126,11 @@ public class FlatMessageFile implements FlatFileInterface {
         return this.fileLock;
     }
 
+    @VisibleForTesting
+    public Semaphore getCommitLock() {
+        return commitLock;
+    }
+
     @Override
     public boolean rollingFile(long interval) {
         return this.commitLog.tryRollingFile(interval);
@@ -156,7 +160,6 @@ public class FlatMessageFile implements FlatFileInterface {
         if (closed) {
             return AppendResult.FILE_CLOSED;
         }
-        this.bufferResultList.add(message);
         return this.appendCommitLog(message.getByteBuffer());
     }
 
@@ -172,29 +175,14 @@ public class FlatMessageFile implements FlatFileInterface 
{
         buffer.putLong(request.getTagsCode());
         buffer.flip();
 
-        this.dispatchRequestList.add(request);
         return consumeQueue.append(buffer, request.getStoreTimestamp());
     }
 
-    @Override
-    public List<DispatchRequest> getDispatchRequestList() {
-        return dispatchRequestList;
-    }
+
 
     @Override
     public void release() {
-        for (SelectMappedBufferResult bufferResult : bufferResultList) {
-            bufferResult.release();
-        }
-
-        if (queueMetadata != null) {
-            log.trace("FlatMessageFile release, topic={}, queueId={}, 
bufferSize={}, requestListSize={}",
-                queueMetadata.getQueue().getTopic(), 
queueMetadata.getQueue().getQueueId(),
-                bufferResultList.size(), dispatchRequestList.size());
-        }
 
-        bufferResultList.clear();
-        dispatchRequestList.clear();
     }
 
     @Override
@@ -246,13 +234,18 @@ public class FlatMessageFile implements FlatFileInterface 
{
 
     @Override
     public CompletableFuture<Boolean> commitAsync() {
+        // acquire lock
+        if (commitLock.drainPermits() <= 0) {
+            return CompletableFuture.completedFuture(false);
+        }
+
         return this.commitLog.commitAsync()
             .thenCompose(result -> {
                 if (result) {
                     return consumeQueue.commitAsync();
                 }
                 return CompletableFuture.completedFuture(false);
-            });
+            }).whenComplete((result, throwable) -> commitLock.release());
     }
 
     @Override
@@ -363,6 +356,11 @@ public class FlatMessageFile implements FlatFileInterface {
         return StringUtils.equals(filePath, ((FlatMessageFile) obj).filePath);
     }
 
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
     @Override
     public void shutdown() {
         closed = true;
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java
new file mode 100644
index 0000000000..e692360761
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/common/GroupCommitContextTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.common;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GroupCommitContextTest {
+
+    @Test
+    public void groupCommitContextTest() {
+        GroupCommitContext releaseGroupCommitContext = new 
GroupCommitContext();
+        releaseGroupCommitContext.release();
+
+        long endOffset = 1000;
+        List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+        dispatchRequestList.add(new DispatchRequest(1000));
+        List<SelectMappedBufferResult> selectMappedBufferResultList = new 
ArrayList<>();
+        selectMappedBufferResultList.add(new SelectMappedBufferResult(100, 
ByteBuffer.allocate(10), 1000, null));
+        GroupCommitContext groupCommitContext = new GroupCommitContext();
+        groupCommitContext.setEndOffset(endOffset);
+        groupCommitContext.setBufferList(selectMappedBufferResultList);
+        groupCommitContext.setDispatchRequests(dispatchRequestList);
+
+        Assert.assertTrue(groupCommitContext.getEndOffset() == endOffset);
+        
Assert.assertTrue(groupCommitContext.getBufferList().equals(selectMappedBufferResultList));
+        
Assert.assertTrue(groupCommitContext.getDispatchRequests().equals(dispatchRequestList));
+        groupCommitContext.release();
+        Assert.assertTrue(groupCommitContext.getDispatchRequests() == null);
+        Assert.assertTrue(groupCommitContext.getBufferList() == null);
+        Assert.assertTrue(dispatchRequestList.isEmpty());
+        Assert.assertTrue(selectMappedBufferResultList.isEmpty());
+    }
+
+}
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
index 92e989e596..6b96076948 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
 import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.TieredMessageStore;
+import org.apache.rocketmq.tieredstore.common.GroupCommitContext;
 import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
 import org.apache.rocketmq.tieredstore.file.FlatFileStore;
 import org.apache.rocketmq.tieredstore.file.FlatMessageFile;
@@ -157,6 +158,130 @@ public class MessageStoreDispatcherImplTest {
         Assert.assertEquals(200L, flatFile.getConsumeQueueCommitOffset());
     }
 
+    @Test
+    public void dispatchCommitFailedTest() throws Exception {
+        MessageStore defaultStore = Mockito.mock(MessageStore.class);
+        Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), 
anyInt())).thenReturn(100L);
+        Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(200L);
+
+        messageStore = Mockito.mock(TieredMessageStore.class);
+        IndexService indexService =
+            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig), storePath);
+        indexService.start();
+        Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
+        Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+        Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
+        Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
+        Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
+
+        // mock message
+        ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();
+        MessageExt messageExt = MessageDecoder.decode(buffer);
+        messageExt.setKeys("Key");
+        MessageAccessor.putProperty(
+            messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
"uk");
+        messageExt.setBody(new byte[10]);
+        messageExt.setStoreSize(0);
+        buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false));
+        buffer.putInt(0, buffer.remaining());
+
+        DispatchRequest request = new DispatchRequest(mq.getTopic(), 
mq.getQueueId(),
+            MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(), 
0L,
+            MessageFormatUtil.getStoreTimeStamp(buffer), 0L,
+            "", "", 0, 0L, new HashMap<>());
+
+        // construct flat file
+        MessageStoreDispatcher dispatcher = new 
MessageStoreDispatcherImpl(messageStore);
+        dispatcher.dispatch(request);
+        FlatMessageFile flatFile = fileStore.getFlatFile(mq);
+        Assert.assertNotNull(flatFile);
+
+        // init offset
+        dispatcher.doScheduleDispatch(flatFile, true).join();
+        Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset());
+        Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset());
+        Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset());
+
+        ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class);
+        Mockito.when(defaultStore.getConsumeQueue(anyString(), 
anyInt())).thenReturn(cq);
+        Mockito.when(cq.get(anyLong())).thenReturn(
+            new CqUnit(100, 1000, buffer.remaining(), 0L));
+        Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(), 
anyInt())).thenReturn(
+            new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(), 
buffer.remaining(), null));
+        flatFile.getCommitLock().drainPermits();
+        dispatcher.doScheduleDispatch(flatFile, true).join();
+        GroupCommitContext groupCommitContext = 
((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile);
+        Assert.assertTrue(groupCommitContext != null);
+        Assert.assertTrue(groupCommitContext.getEndOffset() == 200);
+        flatFile.getCommitLock().release();
+        flatFile.commitAsync().join();
+        dispatcher.doScheduleDispatch(flatFile, true).join();
+        
Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile)
 == null);
+        ((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq);
+        
((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit();
+
+    }
+
+    @Test
+    public void dispatchFailedGroupCommitMapReleaseTest() throws Exception {
+        MessageStore defaultStore = Mockito.mock(MessageStore.class);
+        Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), 
anyInt())).thenReturn(100L);
+        Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(200L);
+
+        messageStore = Mockito.mock(TieredMessageStore.class);
+        IndexService indexService =
+            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig), storePath);
+        indexService.start();
+        Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
+        Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+        Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
+        Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
+        Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
+
+        // mock message
+        ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();
+        MessageExt messageExt = MessageDecoder.decode(buffer);
+        messageExt.setKeys("Key");
+        MessageAccessor.putProperty(
+            messageExt, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
"uk");
+        messageExt.setBody(new byte[10]);
+        messageExt.setStoreSize(0);
+        buffer = ByteBuffer.wrap(MessageDecoder.encode(messageExt, false));
+        buffer.putInt(0, buffer.remaining());
+
+        DispatchRequest request = new DispatchRequest(mq.getTopic(), 
mq.getQueueId(),
+            MessageFormatUtil.getCommitLogOffset(buffer), buffer.remaining(), 
0L,
+            MessageFormatUtil.getStoreTimeStamp(buffer), 0L,
+            "", "", 0, 0L, new HashMap<>());
+
+        // construct flat file
+        MessageStoreDispatcher dispatcher = new 
MessageStoreDispatcherImpl(messageStore);
+        dispatcher.dispatch(request);
+        FlatMessageFile flatFile = fileStore.getFlatFile(mq);
+        Assert.assertNotNull(flatFile);
+
+        // init offset
+        dispatcher.doScheduleDispatch(flatFile, true).join();
+        Assert.assertEquals(100L, flatFile.getConsumeQueueMinOffset());
+        Assert.assertEquals(100L, flatFile.getConsumeQueueMaxOffset());
+        Assert.assertEquals(100L, flatFile.getConsumeQueueCommitOffset());
+
+        ConsumeQueueInterface cq = Mockito.mock(ConsumeQueueInterface.class);
+        Mockito.when(defaultStore.getConsumeQueue(anyString(), 
anyInt())).thenReturn(cq);
+        Mockito.when(cq.get(anyLong())).thenReturn(
+            new CqUnit(100, 1000, buffer.remaining(), 0L));
+        Mockito.when(defaultStore.selectOneMessageByOffset(anyLong(), 
anyInt())).thenReturn(
+            new SelectMappedBufferResult(0L, buffer.asReadOnlyBuffer(), 
buffer.remaining(), null));
+        flatFile.getCommitLock().drainPermits();
+        dispatcher.doScheduleDispatch(flatFile, true).join();
+        GroupCommitContext groupCommitContext = 
((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile);
+        Assert.assertTrue(groupCommitContext != null);
+        ((MessageStoreDispatcherImpl)dispatcher).flatFileStore.destroyFile(mq);
+        
((MessageStoreDispatcherImpl)dispatcher).releaseClosedPendingGroupCommit();
+        
Assert.assertTrue(((MessageStoreDispatcherImpl)dispatcher).getFailedGroupCommitMap().get(flatFile)
 == null);
+
+    }
+
     @Test
     public void dispatchServiceTest() {
         MessageStore defaultStore = Mockito.mock(MessageStore.class);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 8a417f54a7..8208d27741 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -216,4 +216,12 @@ public class FlatMessageFileTest {
 
         flatFile.destroy();
     }
+
+    @Test
+    public void testCommitLock() {
+        String topic = "CommitLogTest";
+        FlatMessageFile flatFile = new FlatMessageFile(flatFileFactory, topic, 
0);
+        flatFile.getCommitLock().drainPermits();
+        Assert.assertFalse(flatFile.commitAsync().join());
+    }
 }

Reply via email to