lizhimins commented on code in PR #9256:
URL: https://github.com/apache/rocketmq/pull/9256#discussion_r2113361236


##########
store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java:
##########
@@ -117,4 +103,43 @@ public long getStoreTime(CqUnit cqUnit) {
         }
         return -1;
     }
+
+    /**
+     * get max physic offset in consumeQueue
+     *
+     * @return the max physic offset in consumeQueue
+     * @throws RocksDBException only in rocksdb mode
+     */
+    public abstract long getMaxPhyOffsetInConsumeQueue() throws 
RocksDBException;
+
+    /**
+     * destroy the specific consumeQueue
+     *
+     * @param consumeQueue consumeQueue to be destroyed
+     * @throws RocksDBException only in rocksdb mode
+     */
+    protected abstract void destroy(ConsumeQueueInterface consumeQueue) throws 
RocksDBException;
+
+    @Override
+    public boolean deleteTopic(String topic) {
+        ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = 
this.consumeQueueTable.get(topic);
+
+        if (queueTable == null || queueTable.isEmpty()) {
+            return false;
+        }
+
+        for (ConsumeQueueInterface cq : queueTable.values()) {
+            try {
+                destroy(cq);
+            } catch (RocksDBException e) {
+                log.error("DeleteTopic: ConsumeQueue cleans error!, topic={}, 
queueId={}", cq.getTopic(), cq.getQueueId(), e);

Review Comment:
   这里失败了可能有脏数据



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to