This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9899be15e [ISSUE #6933] Support recreate file if local cq and tiered
storage offset not match
9899be15e is described below
commit 9899be15e7fcac45f60988ea84c41c1a6f615554
Author: lizhimins <[email protected]>
AuthorDate: Fri Jun 23 15:37:57 2023 +0800
[ISSUE #6933] Support recreate file if local cq and tiered storage offset
not match
---
.../rocketmq/tieredstore/TieredDispatcher.java | 229 +++++++++++++--------
.../rocketmq/tieredstore/file/TieredCommitLog.java | 2 +-
.../rocketmq/tieredstore/TieredDispatcherTest.java | 10 +-
3 files changed, 147 insertions(+), 94 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index d3ed01e86..0d89d305b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -60,8 +60,8 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
private final MessageStore defaultStore;
private final TieredMessageStoreConfig storeConfig;
private final TieredFlatFileManager tieredFlatFileManager;
- private final ReentrantLock dispatchLock;
- private final ReentrantLock dispatchRequestListLock;
+ private final ReentrantLock dispatchTaskLock;
+ private final ReentrantLock dispatchWriteLock;
private ConcurrentMap<CompositeQueueFlatFile, List<DispatchRequest>>
dispatchRequestReadMap;
private ConcurrentMap<CompositeQueueFlatFile, List<DispatchRequest>>
dispatchRequestWriteMap;
@@ -73,19 +73,18 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
this.tieredFlatFileManager =
TieredFlatFileManager.getInstance(storeConfig);
this.dispatchRequestReadMap = new ConcurrentHashMap<>();
this.dispatchRequestWriteMap = new ConcurrentHashMap<>();
- this.dispatchLock = new ReentrantLock();
- this.dispatchRequestListLock = new ReentrantLock();
+ this.dispatchTaskLock = new ReentrantLock();
+ this.dispatchWriteLock = new ReentrantLock();
this.initScheduleTask();
}
private void initScheduleTask() {
- TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(()
-> {
+ TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(()
->
tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile ->
{
if (!flatFile.getCompositeFlatFileLock().isLocked()) {
dispatchFlatFile(flatFile);
}
- });
- }, 30, 10, TimeUnit.SECONDS);
+ }), 30, 10, TimeUnit.SECONDS);
}
@Override
@@ -99,43 +98,44 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
return;
}
- CompositeQueueFlatFile flatFile =
- tieredFlatFileManager.getOrCreateFlatFileIfAbsent(new
MessageQueue(topic, brokerName, request.getQueueId()));
+ CompositeQueueFlatFile flatFile =
tieredFlatFileManager.getOrCreateFlatFileIfAbsent(
+ new MessageQueue(topic, brokerName, request.getQueueId()));
if (flatFile == null) {
- logger.error("[Bug]TieredDispatcher#dispatch: dispatch failed, " +
- "can not create flatFile: topic: {}, queueId: {}",
request.getTopic(), request.getQueueId());
+ logger.error("[Bug] TieredDispatcher#dispatch: get or create flat
file failed, skip this request. ",
+ "topic: {}, queueId: {}", request.getTopic(),
request.getQueueId());
return;
}
- // prevent consume queue and index file falling too far
- int groupCommitCount = storeConfig.getTieredStoreMaxGroupCommitCount();
- if (dispatchRequestWriteMap.getOrDefault(flatFile,
Collections.emptyList()).size() > groupCommitCount
- || dispatchRequestReadMap.getOrDefault(flatFile,
Collections.emptyList()).size() > groupCommitCount) {
+ if (detectFallBehind(flatFile)) {
return;
}
- // init dispatch offset
+ // Set cq offset as commitlog first dispatch offset if flat file first
init
if (flatFile.getDispatchOffset() == -1) {
flatFile.initOffset(request.getConsumeQueueOffset());
}
if (request.getConsumeQueueOffset() == flatFile.getDispatchOffset()) {
+
+ // In order to ensure the efficiency of dispatch operation and
avoid high dispatch delay,
+ // it is not allowed to block for a long time here.
try {
+ // Acquired flat file write lock to append commitlog
if (flatFile.getCompositeFlatFileLock().isLocked()
|| !flatFile.getCompositeFlatFileLock().tryLock(3,
TimeUnit.MILLISECONDS)) {
return;
}
} catch (Exception e) {
- logger.warn("TieredDispatcher#dispatch: dispatch failed, " +
- "can not get flatFile lock: topic: {}, queueId: {}",
request.getTopic(), request.getQueueId(), e);
+ logger.warn("Temporarily skip dispatch request because we can
not acquired write lock. " +
+ "topic: {}, queueId: {}", request.getTopic(),
request.getQueueId(), e);
if (flatFile.getCompositeFlatFileLock().isLocked()) {
flatFile.getCompositeFlatFileLock().unlock();
}
return;
}
- // double check
+ // double check whether the offset matches
if (request.getConsumeQueueOffset() !=
flatFile.getDispatchOffset()) {
flatFile.getCompositeFlatFileLock().unlock();
return;
@@ -160,19 +160,20 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
AppendResult result =
flatFile.appendCommitLog(message.getByteBuffer());
long newCommitLogOffset = flatFile.getCommitLogMaxOffset() -
message.getByteBuffer().remaining();
- handleAppendCommitLogResult(result, flatFile,
request.getConsumeQueueOffset(), flatFile.getDispatchOffset(),
+ doRedispatchRequestToWriteMap(result, flatFile,
request.getConsumeQueueOffset(),
newCommitLogOffset, request.getMsgSize(),
request.getTagsCode(), message.getByteBuffer());
if (result == AppendResult.SUCCESS) {
Attributes attributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC,
request.getTopic())
.put(TieredStoreMetricsConstant.LABEL_QUEUE_ID,
request.getQueueId())
- .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
FileSegmentType.COMMIT_LOG.name().toLowerCase())
+ .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
+ FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(1,
attributes);
}
} catch (Exception throwable) {
- logger.error("TieredDispatcher#dispatch: dispatch failed: " +
+ logger.error("TieredDispatcher#dispatch: dispatch has
unexpected problem. " +
"topic: {}, queueId: {}, queue offset: {}",
request.getTopic(), request.getQueueId(),
request.getConsumeQueueOffset(), throwable);
} finally {
@@ -202,8 +203,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
try {
dispatchFlatFile(flatFile);
} catch (Throwable throwable) {
- logger.error("[Bug]TieredDispatcher#dispatchFlatFileAsync
dispatch failed, " +
- "can not dispatch, topic: {}, queueId: {}",
+ logger.error("[Bug] TieredDispatcher#dispatchFlatFileAsync
failed, topic: {}, queueId: {}",
flatFile.getMessageQueue().getTopic(),
flatFile.getMessageQueue().getQueueId(), throwable);
}
@@ -218,7 +218,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
return;
}
- if (flatFile.getDispatchOffset() == -1) {
+ if (flatFile.getDispatchOffset() == -1L) {
return;
}
@@ -234,6 +234,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
long minOffsetInQueue = defaultStore.getMinOffsetInQueue(topic,
queueId);
long maxOffsetInQueue = defaultStore.getMaxOffsetInQueue(topic,
queueId);
+ // perhaps it was caused by local cq file corruption or ha truncation
if (beforeOffset >= maxOffsetInQueue) {
return;
}
@@ -243,8 +244,8 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
return;
}
} catch (Exception e) {
- logger.warn("TieredDispatcher#dispatchFlatFile: dispatch failed, "
+
- "can not get flatFile lock: topic: {}, queueId: {}",
mq.getTopic(), mq.getQueueId(), e);
+ logger.warn("TieredDispatcher#dispatchFlatFile: can not acquire
flatFile lock, " +
+ "topic: {}, queueId: {}", mq.getTopic(), mq.getQueueId(), e);
if (flatFile.getCompositeFlatFileLock().isLocked()) {
flatFile.getCompositeFlatFileLock().unlock();
}
@@ -252,25 +253,30 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
try {
- long queueOffset = flatFile.getDispatchOffset();
- if (minOffsetInQueue > queueOffset) {
- logger.warn("BlobDispatcher#dispatchFlatFile: " +
- "message that needs to be dispatched does not exist: "
+
- "topic: {}, queueId: {}, message queue offset: {}, min
queue offset: {}",
- topic, queueId, queueOffset, minOffsetInQueue);
+ long dispatchOffset = flatFile.getDispatchOffset();
+ if (dispatchOffset < minOffsetInQueue) {
+ // If the tiered storage feature is turned off midway,
+ // it may cause cq discontinuity, resulting in data loss here.
+ logger.warn("TieredDispatcher#dispatchFlatFile: dispatch
offset is too small, " +
+ "topic: {}, queueId: {}, dispatch offset: {}, local cq
offset range {}-{}",
+ topic, queueId, dispatchOffset, minOffsetInQueue,
maxOffsetInQueue);
flatFile.initOffset(minOffsetInQueue);
- queueOffset = minOffsetInQueue;
+ dispatchOffset = minOffsetInQueue;
}
- beforeOffset = queueOffset;
+ beforeOffset = dispatchOffset;
- // TODO flow control based on message size
- long limit = Math.min(queueOffset + 100000, maxOffsetInQueue);
+ // flow control by max count, also we could do flow control based
on message size
+ long maxCount = storeConfig.getTieredStoreGroupCommitCount();
+ long upperBound = Math.min(dispatchOffset + maxCount,
maxOffsetInQueue);
ConsumeQueue consumeQueue = (ConsumeQueue)
defaultStore.getConsumeQueue(topic, queueId);
- for (; queueOffset < limit; queueOffset++) {
- SelectMappedBufferResult cqItem =
consumeQueue.getIndexBuffer(queueOffset);
+
+ for (; dispatchOffset < upperBound; dispatchOffset++) {
+ // get consume queue
+ SelectMappedBufferResult cqItem =
consumeQueue.getIndexBuffer(dispatchOffset);
if (cqItem == null) {
- logger.error("[Bug]TieredDispatcher#dispatchFlatFile:
dispatch failed, " +
- "can not get cq item: topic: {}, queueId: {}, offset:
{}", topic, queueId, queueOffset);
+ logger.error("[Bug] TieredDispatcher#dispatchFlatFile: cq
item is null, " +
+ "topic: {}, queueId: {}, dispatch offset: {},
local cq offset range {}-{}",
+ topic, queueId, dispatchOffset, minOffsetInQueue,
maxOffsetInQueue);
return;
}
long commitLogOffset =
CQItemBufferUtil.getCommitLogOffset(cqItem.getByteBuffer());
@@ -278,28 +284,34 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
long tagCode =
CQItemBufferUtil.getTagCode(cqItem.getByteBuffer());
cqItem.release();
+ // get message
SelectMappedBufferResult message =
defaultStore.selectOneMessageByOffset(commitLogOffset, size);
if (message == null) {
- logger.error("TieredDispatcher#dispatchFlatFile: dispatch
failed, " +
- "can not get message from next store: topic: {},
queueId: {}, commitLog offset: {}, size: {}",
+ logger.error("TieredDispatcher#dispatchFlatFile: get
message from next store failed, " +
+ "topic: {}, queueId: {}, commitLog offset: {},
size: {}",
topic, queueId, commitLogOffset, size);
break;
}
+
+ // append commitlog will increase dispatch offset here
AppendResult result =
flatFile.appendCommitLog(message.getByteBuffer(), true);
long newCommitLogOffset = flatFile.getCommitLogMaxOffset() -
message.getByteBuffer().remaining();
- handleAppendCommitLogResult(result, flatFile, queueOffset,
flatFile.getDispatchOffset(), newCommitLogOffset, size, tagCode,
message.getByteBuffer());
+ doRedispatchRequestToWriteMap(
+ result, flatFile, dispatchOffset, newCommitLogOffset,
size, tagCode, message.getByteBuffer());
message.release();
if (result != AppendResult.SUCCESS) {
- queueOffset--;
+ dispatchOffset--;
break;
}
}
+
Attributes attributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic())
.put(TieredStoreMetricsConstant.LABEL_QUEUE_ID,
mq.getQueueId())
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
- TieredStoreMetricsManager.messagesDispatchTotal.add(queueOffset -
beforeOffset, attributes);
+
+ TieredStoreMetricsManager.messagesDispatchTotal.add(dispatchOffset
- beforeOffset, attributes);
} finally {
flatFile.getCompositeFlatFileLock().unlock();
}
@@ -310,8 +322,10 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
}
- public void handleAppendCommitLogResult(AppendResult result,
CompositeQueueFlatFile flatFile,
- long queueOffset, long dispatchOffset, long newCommitLogOffset, int
size, long tagCode, ByteBuffer message) {
+ // Submit cq to write map if append commitlog success
+ public void doRedispatchRequestToWriteMap(AppendResult result,
CompositeQueueFlatFile flatFile,
+ long queueOffset, long newCommitLogOffset, int size, long tagCode,
ByteBuffer message) {
+
MessageQueue mq = flatFile.getMessageQueue();
String topic = mq.getTopic();
int queueId = mq.getQueueId();
@@ -322,18 +336,22 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
case OFFSET_INCORRECT:
long offset = MessageBufferUtil.getQueueOffset(message);
if (queueOffset != offset) {
- logger.error("[Bug]Dispatch append commit log, result={},
offset={}, msg offset={}", queueOffset, offset);
+ logger.error("[Bug] Commitlog offset incorrect, " +
+ "result={}, topic={}, queueId={}, offset={}, msg
offset={}",
+ result, topic, queueId, queueOffset, offset);
}
return;
case BUFFER_FULL:
- logger.debug("Commitlog buffer full, result={}, topic={},
queueId={}, offset={}",result, topic, queueId, queueOffset);
+ logger.debug("Commitlog buffer full, result={}, topic={},
queueId={}, offset={}",
+ result, topic, queueId, queueOffset);
return;
default:
- logger.info("Commitlog append failed, result={}, topic={},
queueId={}, offset={}", result, topic, queueId, queueOffset);
+ logger.info("Commitlog append failed, result={}, topic={},
queueId={}, offset={}",
+ result, topic, queueId, queueOffset);
return;
}
- dispatchRequestListLock.lock();
+ dispatchWriteLock.lock();
try {
Map<String, String> properties =
MessageBufferUtil.getProperties(message);
DispatchRequest dispatchRequest = new DispatchRequest(
@@ -348,49 +366,65 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
properties.getOrDefault(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
""),
0, 0, new HashMap<>());
dispatchRequest.setOffsetId(MessageBufferUtil.getOffsetId(message));
- List<DispatchRequest> requestList =
dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new ArrayList<>());
+ List<DispatchRequest> requestList =
+ dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new
ArrayList<>());
requestList.add(dispatchRequest);
if (requestList.get(0).getConsumeQueueOffset() >=
flatFile.getConsumeQueueMaxOffset()) {
wakeup();
}
} finally {
- dispatchRequestListLock.unlock();
+ dispatchWriteLock.unlock();
}
}
public void swapDispatchRequestList() {
- dispatchRequestListLock.lock();
+ dispatchWriteLock.lock();
try {
dispatchRequestReadMap = dispatchRequestWriteMap;
dispatchRequestWriteMap = new ConcurrentHashMap<>();
} finally {
- dispatchRequestListLock.unlock();
+ dispatchWriteLock.unlock();
}
}
- public void sendBackDispatchRequestList() {
- if (!dispatchRequestReadMap.isEmpty()) {
- dispatchRequestListLock.lock();
- try {
- dispatchRequestReadMap.forEach((flatFile, requestList) -> {
- if (requestList.isEmpty()) {
-
logger.warn("[Bug]TieredDispatcher#sendBackDispatchRequestList: requestList is
empty, no need to send back: topic: {}, queueId: {}",
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
- return;
- }
- List<DispatchRequest> requestListToWrite =
dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new ArrayList<>());
- if (!requestListToWrite.isEmpty() &&
requestList.get(requestList.size() - 1).getConsumeQueueOffset() >
requestListToWrite.get(0).getConsumeQueueOffset()) {
-
logger.warn("[Bug]TieredDispatcher#sendBackDispatchRequestList: dispatch
request list is not continuous: topic: {}, queueId: {}, last list max offset:
{}, new list min offset: {}",
- flatFile.getMessageQueue().getTopic(),
flatFile.getMessageQueue().getQueueId(),
- requestList.get(requestList.size() -
1).getConsumeQueueOffset(), requestListToWrite.get(0).getConsumeQueueOffset());
+ public void copySurvivorObject() {
+ if (dispatchRequestReadMap.isEmpty()) {
+ return;
+ }
+
+ try {
+ dispatchWriteLock.lock();
+ dispatchRequestReadMap.forEach((flatFile, requestList) -> {
+ String topic = flatFile.getMessageQueue().getTopic();
+ int queueId = flatFile.getMessageQueue().getQueueId();
+ if (requestList.isEmpty()) {
+ logger.warn("Copy survivor object failed, dispatch request
list is empty, " +
+ "topic: {}, queueId: {}", topic, queueId);
+ return;
+ }
+
+ List<DispatchRequest> requestListToWrite =
+ dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new
ArrayList<>());
+
+ if (!requestListToWrite.isEmpty()) {
+ long readOffset = requestList.get(requestList.size() -
1).getConsumeQueueOffset();
+ long writeOffset =
requestListToWrite.get(0).getConsumeQueueOffset();
+ if (readOffset > writeOffset) {
+ logger.warn("Copy survivor object failed, offset in
request list are not continuous. " +
+ "topic: {}, queueId: {}, read offset: {},
write offset: {}",
+ topic, queueId, readOffset, writeOffset);
+
+ // sort request list according cq offset
requestList.sort(Comparator.comparingLong(DispatchRequest::getConsumeQueueOffset));
}
- requestList.addAll(requestListToWrite);
- dispatchRequestWriteMap.put(flatFile, requestList);
- });
- dispatchRequestReadMap = new ConcurrentHashMap<>();
- } finally {
- dispatchRequestListLock.unlock();
- }
+ }
+
+ requestList.addAll(requestListToWrite);
+ dispatchRequestWriteMap.put(flatFile, requestList);
+ });
+ dispatchRequestReadMap = new ConcurrentHashMap<>();
+ } finally {
+ dispatchWriteLock.unlock();
}
}
@@ -425,9 +459,11 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
// build consume queue
AppendResult result = flatFile.appendConsumeQueue(request,
true);
+ // handle build cq result
if (AppendResult.SUCCESS.equals(result)) {
long cqCount = cqMetricsMap.computeIfAbsent(messageQueue,
key -> 0L);
cqMetricsMap.put(messageQueue, cqCount + 1);
+
// build index
if (storeConfig.isMessageIndexEnable()) {
result = flatFile.appendIndexFile(request);
@@ -436,7 +472,8 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
ifMetricsMap.put(messageQueue, ifCount + 1);
iterator.remove();
} else {
- logger.warn("Build indexFile failed, result: {},
topic: {}, queue: {}, queue offset: {}",
+ logger.warn("Build index failed, skip this
message, " +
+ "result: {}, topic: {}, queue: {}, request
offset: {}",
result, request.getTopic(),
request.getQueueId(), request.getConsumeQueueOffset());
}
}
@@ -444,15 +481,30 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
if (AppendResult.OFFSET_INCORRECT.equals(result)) {
- logger.error("Build consumeQueue and indexFile failed,
offset is messed up, " +
- "try to rebuild cq: topic: {}, queue: {}, queue
offset: {}, max queue offset: {}",
- request.getTopic(), request.getQueueId(),
+ logger.error("Consume queue offset incorrect, try to
recreated consume queue, " +
+ "result: {}, topic: {}, queue: {}, request offset:
{}, current cq offset: {}",
+ result, request.getTopic(), request.getQueueId(),
request.getConsumeQueueOffset(),
flatFile.getConsumeQueueMaxOffset());
try {
flatFile.getCompositeFlatFileLock().lock();
- // rollback dispatch offset, this operation will cause
duplicate message in commitLog
-
flatFile.initOffset(flatFile.getConsumeQueueMaxOffset());
+
+ // reset dispatch offset, this operation will cause
duplicate message in commitLog
+ long minOffsetInQueue =
+
defaultStore.getMinOffsetInQueue(request.getTopic(), request.getQueueId());
+
+ // when dispatch offset is smaller than min offset in
local cq
+ // some messages may be lost at this time
+ if (flatFile.getConsumeQueueMaxOffset() <
minOffsetInQueue) {
+ // if we use flatFile.destroy() directly will
cause manager reference leak.
+
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
+ logger.warn("Found cq max offset is smaller than
local cq min offset, " +
+ "so destroy tiered flat file to recreated,
topic: {}, queueId: {}",
+ request.getTopic(), request.getQueueId());
+ } else {
+
flatFile.initOffset(flatFile.getConsumeQueueMaxOffset());
+ }
+
// clean invalid dispatch request
dispatchRequestWriteMap.remove(flatFile);
requestList.clear();
@@ -462,7 +514,8 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
break;
}
- logger.warn("Build consumeQueue failed, result: {}, topic: {},
queue: {}, queue offset: {}",
+ // other append result
+ logger.warn("Append consume queue failed, result: {}, topic:
{}, queue: {}, request offset: {}",
result, request.getTopic(), request.getQueueId(),
request.getConsumeQueueOffset());
}
@@ -490,18 +543,18 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
TieredStoreMetricsManager.messagesDispatchTotal.add(count,
attributes);
});
- sendBackDispatchRequestList();
+ copySurvivorObject();
}
// Allow work-stealing
public void doDispatchTask() {
try {
- dispatchLock.lock();
+ dispatchTaskLock.lock();
buildConsumeQueueAndIndexFile();
} catch (Exception e) {
- logger.error("Build consumeQueue and indexFile failed", e);
+ logger.error("Tiered storage do dispatch task failed", e);
} finally {
- dispatchLock.unlock();
+ dispatchTaskLock.unlock();
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
index 67e49af55..92aea58be 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
@@ -111,7 +111,7 @@ public class TieredCommitLog {
try {
if (System.currentTimeMillis() - fileSegment.getMaxTimestamp() >
TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval())
- && fileSegment.getSize() >
storeConfig.getCommitLogRollingMinimumSize()) {
+ && fileSegment.getAppendPosition() >
storeConfig.getCommitLogRollingMinimumSize()) {
flatFile.rollingNewFile();
}
} catch (Exception e) {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index e5f3f9c6c..e6adef1d1 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -118,15 +118,15 @@ public class TieredDispatcherTest {
flatFile.commitCommitLog();
Assert.assertEquals(10, flatFile.getDispatchOffset());
- dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile,
8, 8, 0, 0, 0, buffer1);
- dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile,
9, 9, 0, 0, 0, buffer2);
+ dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 8, 8, 0, 0, buffer1);
+ dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 9, 9, 0, 0, buffer2);
dispatcher.buildConsumeQueueAndIndexFile();
Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset());
Assert.assertEquals(7, flatFile.getDispatchOffset());
- dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile,
7, 7, 0, 0, 0, buffer1);
- dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile,
8, 8, 0, 0, 0, buffer2);
- dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile,
9, 9, 0, 0, 0, buffer3);
+ dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 7, 7, 0, 0, buffer1);
+ dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 8, 8, 0, 0, buffer2);
+ dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 9, 9, 0, 0, buffer3);
dispatcher.buildConsumeQueueAndIndexFile();
Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset());
}