This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66b69ad647a [fix][broker]User topic failed to delete after removed 
cluster because of failed delete data from transaction buffer topic (#24648)
66b69ad647a is described below

commit 66b69ad647a4fad304a3d8a0abeb81be8414f140
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 29 18:33:03 2025 +0800

    [fix][broker]User topic failed to delete after removed cluster because of 
failed delete data from transaction buffer topic (#24648)
---
 .../broker/service/persistent/PersistentTopic.java | 32 ++++++++++++++++------
 .../broker/service/persistent/SystemTopic.java     |  4 +++
 .../NamespaceEventsSystemTopicFactory.java         | 21 ++++++++++++++
 .../SingleSnapshotAbortedTxnProcessorImpl.java     | 23 ++++++++++++----
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    | 31 +++++++++++++++++----
 .../service/OneWayReplicatorUsingGlobalZKTest.java | 14 ++++++++--
 .../pulsar/common/naming/SystemTopicNames.java     | 11 ++++++++
 7 files changed, 115 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7ed459daafb..cc6cbb348f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -410,6 +410,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         TopicName topicName = TopicName.get(topic);
         if 
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
                 && !isEventSystemTopic(topicName)
+                && !SystemTopicNames.isTransactionInternalName(topicName)
+                && 
!SystemTopicNames.isTransactionBufferOrPendingAckSystemTopicName(topicName)
                 && 
!NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
                 && !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             this.transactionBuffer = brokerService.getPulsar()
@@ -1934,13 +1936,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         final String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
 
-        return checkAllowedCluster(localCluster).thenCompose(success -> {
-            if (!success) {
-                // if local cluster is removed from global namespace 
cluster-list : then delete topic forcefully
-                // because pulsar doesn't serve global topic without local 
repl-cluster configured.
-                return deleteForcefully().thenCompose(ignore -> {
-                    return deleteSchemaAndPoliciesIfClusterRemoved();
-                });
+        return removeTopicIfLocalClusterNotAllowed().thenCompose(topicRemoved 
-> {
+            if (topicRemoved) {
+                return CompletableFuture.completedFuture(null);
             }
 
             int newMessageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
@@ -2066,7 +2064,25 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             });
     }
 
-    private CompletableFuture<Boolean> checkAllowedCluster(String 
localCluster) {
+    /**
+     * Remove the topic if local cluster is not allowed to serve the topic.
+     * @return whether the topic was removed.
+     */
+    protected CompletableFuture<Boolean> removeTopicIfLocalClusterNotAllowed() 
{
+        final String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
+        return checkAllowedCluster(localCluster).thenCompose(isAllowed -> {
+            if (!isAllowed) {
+                // if local cluster is removed from global namespace 
cluster-list : then delete topic forcefully
+                // because pulsar doesn't serve global topic without local 
repl-cluster configured.
+                return deleteForcefully().thenCompose(ignore -> {
+                    return 
deleteSchemaAndPoliciesIfClusterRemoved().thenApply(__ -> true);
+                });
+            }
+            return CompletableFuture.completedFuture(false);
+        });
+    }
+
+    protected CompletableFuture<Boolean> checkAllowedCluster(String 
localCluster) {
         List<String> replicationClusters = 
topicPolicies.getReplicationClusters().get();
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
                 
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional
 -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 98711dab9b0..cd99d3805c7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -75,6 +75,10 @@ public class SystemTopic extends PersistentTopic {
         if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
             return super.checkReplication();
         }
+        // Since the txn system topic is not allowed to access anymore, we 
should delete data.
+        if 
(SystemTopicNames.isTransactionBufferOrPendingAckSystemTopicName(TopicName.get(topic)))
 {
+            return super.removeTopicIfLocalClusterNotAllowed().thenAccept(__ 
-> {});
+        }
         return CompletableFuture.completedFuture(null);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index 199026bc4c4..0eabe0f2cc3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.events.EventType;
@@ -47,6 +49,25 @@ public class NamespaceEventsSystemTopicFactory {
                 SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
     }
 
+    public static CompletableFuture<Boolean> 
checkSystemTopicExists(NamespaceName namespaceName, EventType eventType,
+                                                             PulsarService 
pulsar) {
+        // To check whether partitioned topic exists.
+        // Instead of checking partitioned metadata, we check the first 
partition, because there is a case
+        // does not work if we choose checking partitioned metadata.
+        // The case's details:
+        // 1. Start 2 clusters: c1 and c2.
+        // 2. Enable replication between c1 and c2 with a global ZK.
+        // 3. The partitioned metadata was shared using by c1 and c2.
+        // 4. Pulsar only delete partitions when the topic is deleting from 
c1, because c2 is still using
+        //    partitioned metadata.
+        TopicName topicName = getSystemTopicName(namespaceName, eventType);
+        CompletableFuture<Boolean> nonPartitionedExists =
+                
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName);
+        CompletableFuture<Boolean> partition0Exists =
+                
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName.getPartition(0));
+        return nonPartitionedExists.thenCombine(partition0Exists, (a, b) -> a 
| b);
+    }
+
     public <T> TransactionBufferSnapshotBaseSystemTopicClient<T> 
createTransactionBufferSystemTopicClient(
             TopicName systemTopicName, SystemTopicTxnBufferSnapshotService<T>
             systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 86ae3ea4824..c737da2ed0e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -25,12 +25,16 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 
@@ -108,11 +112,20 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
 
     @Override
     public CompletableFuture<Void> clearAbortedTxnSnapshot() {
-        return this.takeSnapshotWriter.getFuture().thenCompose(writer -> {
-            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
-            snapshot.setTopicName(topic.getName());
-            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
-        }).thenRun(() -> log.info("[{}] Successes to delete the aborted 
transaction snapshot", this.topic));
+        NamespaceName namespaceName = 
TopicName.get(topic.getName()).getNamespaceObject();
+        PulsarService pulsar = topic.getBrokerService().getPulsar();
+        return NamespaceEventsSystemTopicFactory.checkSystemTopicExists(
+                    namespaceName, EventType.TRANSACTION_BUFFER_SNAPSHOT, 
pulsar)
+            .thenCompose(exists -> {
+                if (exists) {
+                    return 
this.takeSnapshotWriter.getFuture().thenCompose(writer -> {
+                        TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+                        snapshot.setTopicName(topic.getName());
+                        return writer.deleteAsync(snapshot.getTopicName(), 
snapshot);
+                    }).thenRun(() -> log.info("[{}] Successes to delete the 
aborted transaction snapshot", this.topic));
+                }
+                return CompletableFuture.completedFuture(null);
+            });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 1325ef482f3..98d8a40eb36 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -42,9 +42,11 @@ import org.apache.bookkeeper.mledger.ReadOnlyManagedLedger;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
@@ -57,6 +59,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -730,12 +733,28 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
         }
 
         private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() {
-            CompletableFuture<Void> res = 
persistentWorker.clearAllSnapshotSegments()
-                    .thenCompose((ignore) -> snapshotIndexWriter.getFuture()
-                            .thenCompose(indexesWriter -> 
indexesWriter.writeAsync(topic.getName(), null)))
-                    .thenRun(() ->
-                            log.debug("Successes to clear the snapshot segment 
and indexes for the topic [{}]",
-                                    topic.getName()));
+            NamespaceName namespaceName = 
TopicName.get(topic.getName()).getNamespaceObject();
+            PulsarService pulsar = topic.getBrokerService().getPulsar();
+            CompletableFuture<Void> deleteSegmentFuture = 
NamespaceEventsSystemTopicFactory.checkSystemTopicExists(
+                            namespaceName, 
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS, pulsar)
+                    .thenCompose(exists -> {
+                        if (exists) {
+                            return persistentWorker.clearAllSnapshotSegments();
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
+            CompletableFuture<Void> res = 
deleteSegmentFuture.thenCompose(ignore ->
+                    NamespaceEventsSystemTopicFactory.checkSystemTopicExists(
+                            namespaceName, 
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, pulsar)
+                    .thenCompose(exists -> {
+                        if (exists) {
+                            return snapshotIndexWriter.getFuture()
+                                .thenCompose(writer -> 
writer.writeAsync(topic.getName(), null))
+                                .thenRun(() -> log.debug("Successes to clear 
the snapshot segment and indexes for"
+                                        + " the topic [{}]", topic.getName()));
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }));
             res.exceptionally(e -> {
                 log.error("Failed to clear the snapshot segment and indexes 
for the topic [{}]",
                         topic.getName(), e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 1db132eb47a..7e20bdb8780 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -43,6 +44,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -65,7 +68,14 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.cleanup();
     }
 
-    @Test(enabled = false)
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+                                     LocalBookkeeperEnsemble 
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+        super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, 
brokerConfigZk);
+        config.setTransactionCoordinatorEnabled(true);
+    }
+
+
+        @Test(enabled = false)
     public void testReplicatorProducerStatInTopic() throws Exception {
         super.testReplicatorProducerStatInTopic();
     }
@@ -484,7 +494,7 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         // The topics under the namespace of the cluster-1 will be deleted.
         // Verify the result.
         admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
-        
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(()
 -> {
+        
Awaitility.await().atMost(Duration.ofSeconds(120)).ignoreExceptions().untilAsserted(()
 -> {
             Map<String, CompletableFuture<Optional<Topic>>> tps = 
pulsar1.getBrokerService().getTopics();
             assertFalse(tps.containsKey(topic));
             assertFalse(tps.containsKey(topicChangeEvents));
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index 9a3689912c9..df8cd6116cd 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -89,6 +89,9 @@ public class SystemTopicNames {
         return 
TopicName.getPartitionedTopicName(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
     }
 
+    /**
+     * These topics can not be created manually by users.
+     */
     public static boolean isTransactionInternalName(TopicName topicName) {
         String topic = topicName.toString();
         return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString())
@@ -96,6 +99,14 @@ public class SystemTopicNames {
                 || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
     }
 
+    public static boolean 
isTransactionBufferOrPendingAckSystemTopicName(TopicName topicName) {
+        String topic = topicName.getPartitionedTopicName();
+        return topic.endsWith(TRANSACTION_BUFFER_SNAPSHOT.toString())
+                || 
topic.endsWith(TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS.toString())
+                || topic.endsWith(TRANSACTION_BUFFER_SNAPSHOT_INDEXES)
+                || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
+    }
+
     public static boolean isSystemTopic(TopicName topicName) {
         TopicName nonPartitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
         return isEventSystemTopic(nonPartitionedTopicName) || 
isTransactionInternalName(nonPartitionedTopicName);

Reply via email to