bxfjb commented on code in PR #7899:
URL: https://github.com/apache/rocketmq/pull/7899#discussion_r1525862549


##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.core;
+
+import io.opentelemetry.api.common.Attributes;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
+import org.apache.rocketmq.tieredstore.TieredMessageStore;
+import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.common.FileSegmentType;
+import org.apache.rocketmq.tieredstore.file.FlatFileInterface;
+import org.apache.rocketmq.tieredstore.file.FlatFileStore;
+import org.apache.rocketmq.tieredstore.index.IndexService;
+import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
+import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
+import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageStoreDispatcherImpl extends ServiceThread implements 
MessageStoreDispatcher {
+
+    protected static final Logger log = 
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    protected final String brokerName;
+    protected final MessageStore defaultStore;
+    protected final MessageStoreConfig storeConfig;
+    protected final TieredMessageStore messageStore;
+    protected final FlatFileStore flatFileStore;
+    protected final MessageStoreExecutor storeExecutor;
+    protected final MessageStoreFilter topicFilter;
+    protected final Semaphore semaphore;
+    protected final IndexService indexService;
+
+    public MessageStoreDispatcherImpl(TieredMessageStore messageStore) {
+        this.messageStore = messageStore;
+        this.storeConfig = messageStore.getStoreConfig();
+        this.defaultStore = messageStore.getDefaultStore();
+        this.brokerName = storeConfig.getBrokerName();
+        this.semaphore = new Semaphore(
+            this.storeConfig.getTieredStoreMaxPendingLimit() / 4);
+        this.topicFilter = messageStore.getTopicFilter();
+        this.flatFileStore = messageStore.getFlatFileStore();
+        this.storeExecutor = messageStore.getStoreExecutor();
+        this.indexService = messageStore.getIndexService();
+    }
+
+    @Override
+    public String getServiceName() {
+        return MessageStoreDispatcher.class.getSimpleName();
+    }
+
+    public void dispatchWithSemaphore(FlatFileInterface flatFile) {
+        try {
+            if (stopped) {
+                return;
+            }
+            semaphore.acquire();
+            this.dispatchAsync(flatFile, false)
+                .whenComplete((future, throwable) -> semaphore.release());
+        } catch (InterruptedException e) {
+            semaphore.release();
+        }
+    }
+
+    @Override
+    public void dispatch(DispatchRequest request) {
+        if (stopped || topicFilter != null && 
topicFilter.filterTopic(request.getTopic())) {
+            return;
+        }
+        flatFileStore.computeIfAbsent(
+            new MessageQueue(request.getTopic(), brokerName, 
request.getQueueId()));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> dispatchAsync(FlatFileInterface 
flatFile, boolean force) {
+        if (stopped) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        String topic = flatFile.getMessageQueue().getTopic();
+        int queueId = flatFile.getMessageQueue().getQueueId();
+
+        force = !storeConfig.isTieredStoreGroupCommit() || force;
+        if (force) {
+            flatFile.getFileLock().lock();
+        } else {
+            if (!flatFile.getFileLock().tryLock()) {
+                return CompletableFuture.completedFuture(false);
+            }
+        }
+
+        try {
+            if (topicFilter != null && 
topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) {
+                flatFileStore.destroyFile(flatFile.getMessageQueue());
+                return CompletableFuture.completedFuture(false);
+            }
+
+            long currentOffset = flatFile.getConsumeQueueMaxOffset();
+            long commitOffset = flatFile.getConsumeQueueCommitOffset();
+            long minOffsetInQueue = defaultStore.getMinOffsetInQueue(topic, 
queueId);
+            long maxOffsetInQueue = defaultStore.getMaxOffsetInQueue(topic, 
queueId);
+
+            // If set to max offset here, some written messages may be lost
+            if (!flatFile.isFlatFileInit()) {
+                currentOffset = Math.max(minOffsetInQueue,
+                    maxOffsetInQueue - 
storeConfig.getTieredStoreGroupCommitSize());
+                flatFile.initOffset(currentOffset);
+                return CompletableFuture.completedFuture(true);
+            }
+
+            // If last commit failed
+            if (commitOffset < currentOffset) {
+                this.commitAsync(flatFile);
+                return CompletableFuture.completedFuture(false);
+            }
+
+            if (currentOffset == maxOffsetInQueue) {
+                return CompletableFuture.completedFuture(false);
+            }
+
+            if (currentOffset < minOffsetInQueue) {
+                log.warn("MessageDispatcher#dispatch, current offset is too 
small, " +
+                        "topic={}, queueId={}, offset={}-{}, current={}",
+                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset);
+                flatFileStore.destroyFile(flatFile.getMessageQueue());
+                flatFileStore.computeIfAbsent(new MessageQueue(topic, 
brokerName, queueId));
+                return CompletableFuture.completedFuture(true);
+            }
+
+            if (currentOffset > maxOffsetInQueue) {
+                log.warn("MessageDispatcher#dispatch, current offset is too 
large, " +
+                        "topic: {}, queueId: {}, offset={}-{}, current={}",
+                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset);
+                return CompletableFuture.completedFuture(false);
+            }
+
+            long interval = 
TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval());
+            if (flatFile.rollingFile(interval)) {
+                log.warn("MessageDispatcher#dispatch, rolling file, " +
+                        "topic: {}, queueId: {}, offset={}-{}, current={}",
+                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset);
+            }
+
+            long bufferSize = 0L;
+            long groupCommitSize = storeConfig.getTieredStoreGroupCommitSize();
+            long groupCommitCount = 
storeConfig.getTieredStoreGroupCommitCount();
+            long targetOffset = Math.min(currentOffset + groupCommitCount, 
maxOffsetInQueue);
+
+            ConsumeQueueInterface consumeQueue = 
defaultStore.getConsumeQueue(topic, queueId);
+            CqUnit cqUnit = consumeQueue.get(currentOffset);
+            SelectMappedBufferResult message =
+                defaultStore.selectOneMessageByOffset(cqUnit.getPos(), 
cqUnit.getSize());
+            boolean timeout = 
MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
+                TimeUnit.SECONDS.toMillis(30) < System.currentTimeMillis();

Review Comment:
   谢谢,另外想再多问一句dfs在这里指的是什么



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to