This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5f1c540d762 [fix][broker]User topic failed to delete after removed
cluster because of failed delete data from transaction buffer topic (#24648)
5f1c540d762 is described below
commit 5f1c540d76221ff149b9f48d8950cfedad51787f
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 29 18:33:03 2025 +0800
[fix][broker]User topic failed to delete after removed cluster because of
failed delete data from transaction buffer topic (#24648)
---
.../broker/service/persistent/PersistentTopic.java | 32 ++++++++++++++++------
.../broker/service/persistent/SystemTopic.java | 4 +++
.../NamespaceEventsSystemTopicFactory.java | 21 ++++++++++++++
.../SingleSnapshotAbortedTxnProcessorImpl.java | 23 ++++++++++++----
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 31 +++++++++++++++++----
.../service/OneWayReplicatorUsingGlobalZKTest.java | 14 ++++++++--
.../pulsar/common/naming/SystemTopicNames.java | 11 ++++++++
7 files changed, 115 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 061cbb1b13e..d0424ec912f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -409,6 +409,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
TopicName topicName = TopicName.get(topic);
if
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(topicName)
+ && !SystemTopicNames.isTransactionInternalName(topicName)
+ &&
!SystemTopicNames.isTransactionBufferOrPendingAckSystemTopicName(topicName)
&&
!NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
&& !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
this.transactionBuffer = brokerService.getPulsar()
@@ -1930,13 +1932,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
final String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
- return checkAllowedCluster(localCluster).thenCompose(success -> {
- if (!success) {
- // if local cluster is removed from global namespace
cluster-list : then delete topic forcefully
- // because pulsar doesn't serve global topic without local
repl-cluster configured.
- return deleteForcefully().thenCompose(ignore -> {
- return deleteSchemaAndPoliciesIfClusterRemoved();
- });
+ return removeTopicIfLocalClusterNotAllowed().thenCompose(topicRemoved
-> {
+ if (topicRemoved) {
+ return CompletableFuture.completedFuture(null);
}
int newMessageTTLInSeconds =
topicPolicies.getMessageTTLInSeconds().get();
@@ -2062,7 +2060,25 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
});
}
- private CompletableFuture<Boolean> checkAllowedCluster(String
localCluster) {
+ /**
+ * Remove the topic if local cluster is not allowed to serve the topic.
+ * @return whether the topic was removed.
+ */
+ protected CompletableFuture<Boolean> removeTopicIfLocalClusterNotAllowed()
{
+ final String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
+ return checkAllowedCluster(localCluster).thenCompose(isAllowed -> {
+ if (!isAllowed) {
+ // if local cluster is removed from global namespace
cluster-list : then delete topic forcefully
+ // because pulsar doesn't serve global topic without local
repl-cluster configured.
+ return deleteForcefully().thenCompose(ignore -> {
+ return
deleteSchemaAndPoliciesIfClusterRemoved().thenApply(__ -> true);
+ });
+ }
+ return CompletableFuture.completedFuture(false);
+ });
+ }
+
+ protected CompletableFuture<Boolean> checkAllowedCluster(String
localCluster) {
List<String> replicationClusters =
topicPolicies.getReplicationClusters().get();
return
brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional
-> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 98711dab9b0..cd99d3805c7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -75,6 +75,10 @@ public class SystemTopic extends PersistentTopic {
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
return super.checkReplication();
}
+ // Since the txn system topic is not allowed to access anymore, we
should delete data.
+ if
(SystemTopicNames.isTransactionBufferOrPendingAckSystemTopicName(TopicName.get(topic)))
{
+ return super.removeTopicIfLocalClusterNotAllowed().thenAccept(__
-> {});
+ }
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index 199026bc4c4..0eabe0f2cc3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.systopic;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.events.EventType;
@@ -47,6 +49,25 @@ public class NamespaceEventsSystemTopicFactory {
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
}
+ public static CompletableFuture<Boolean>
checkSystemTopicExists(NamespaceName namespaceName, EventType eventType,
+ PulsarService
pulsar) {
+ // To check whether partitioned topic exists.
+ // Instead of checking partitioned metadata, we check the first
partition, because there is a case
+ // does not work if we choose checking partitioned metadata.
+ // The case's details:
+ // 1. Start 2 clusters: c1 and c2.
+ // 2. Enable replication between c1 and c2 with a global ZK.
+ // 3. The partitioned metadata was shared using by c1 and c2.
+ // 4. Pulsar only delete partitions when the topic is deleting from
c1, because c2 is still using
+ // partitioned metadata.
+ TopicName topicName = getSystemTopicName(namespaceName, eventType);
+ CompletableFuture<Boolean> nonPartitionedExists =
+
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName);
+ CompletableFuture<Boolean> partition0Exists =
+
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName.getPartition(0));
+ return nonPartitionedExists.thenCombine(partition0Exists, (a, b) -> a
| b);
+ }
+
public <T> TransactionBufferSnapshotBaseSystemTopicClient<T>
createTransactionBufferSystemTopicClient(
TopicName systemTopicName, SystemTopicTxnBufferSnapshotService<T>
systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 86ae3ea4824..c737da2ed0e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -25,12 +25,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
@@ -108,11 +112,20 @@ public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcesso
@Override
public CompletableFuture<Void> clearAbortedTxnSnapshot() {
- return this.takeSnapshotWriter.getFuture().thenCompose(writer -> {
- TransactionBufferSnapshot snapshot = new
TransactionBufferSnapshot();
- snapshot.setTopicName(topic.getName());
- return writer.deleteAsync(snapshot.getTopicName(), snapshot);
- }).thenRun(() -> log.info("[{}] Successes to delete the aborted
transaction snapshot", this.topic));
+ NamespaceName namespaceName =
TopicName.get(topic.getName()).getNamespaceObject();
+ PulsarService pulsar = topic.getBrokerService().getPulsar();
+ return NamespaceEventsSystemTopicFactory.checkSystemTopicExists(
+ namespaceName, EventType.TRANSACTION_BUFFER_SNAPSHOT,
pulsar)
+ .thenCompose(exists -> {
+ if (exists) {
+ return
this.takeSnapshotWriter.getFuture().thenCompose(writer -> {
+ TransactionBufferSnapshot snapshot = new
TransactionBufferSnapshot();
+ snapshot.setTopicName(topic.getName());
+ return writer.deleteAsync(snapshot.getTopicName(),
snapshot);
+ }).thenRun(() -> log.info("[{}] Successes to delete the
aborted transaction snapshot", this.topic));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 1325ef482f3..98d8a40eb36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -42,9 +42,11 @@ import org.apache.bookkeeper.mledger.ReadOnlyManagedLedger;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
@@ -57,6 +59,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -730,12 +733,28 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
}
private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() {
- CompletableFuture<Void> res =
persistentWorker.clearAllSnapshotSegments()
- .thenCompose((ignore) -> snapshotIndexWriter.getFuture()
- .thenCompose(indexesWriter ->
indexesWriter.writeAsync(topic.getName(), null)))
- .thenRun(() ->
- log.debug("Successes to clear the snapshot segment
and indexes for the topic [{}]",
- topic.getName()));
+ NamespaceName namespaceName =
TopicName.get(topic.getName()).getNamespaceObject();
+ PulsarService pulsar = topic.getBrokerService().getPulsar();
+ CompletableFuture<Void> deleteSegmentFuture =
NamespaceEventsSystemTopicFactory.checkSystemTopicExists(
+ namespaceName,
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS, pulsar)
+ .thenCompose(exists -> {
+ if (exists) {
+ return persistentWorker.clearAllSnapshotSegments();
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ CompletableFuture<Void> res =
deleteSegmentFuture.thenCompose(ignore ->
+ NamespaceEventsSystemTopicFactory.checkSystemTopicExists(
+ namespaceName,
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, pulsar)
+ .thenCompose(exists -> {
+ if (exists) {
+ return snapshotIndexWriter.getFuture()
+ .thenCompose(writer ->
writer.writeAsync(topic.getName(), null))
+ .thenRun(() -> log.debug("Successes to clear
the snapshot segment and indexes for"
+ + " the topic [{}]", topic.getName()));
+ }
+ return CompletableFuture.completedFuture(null);
+ }));
res.exceptionally(e -> {
log.error("Failed to clear the snapshot segment and indexes
for the topic [{}]",
topic.getName(), e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 1db132eb47a..7e20bdb8780 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -43,6 +44,8 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -65,7 +68,14 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.cleanup();
}
- @Test(enabled = false)
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+ super.setConfigDefaults(config, clusterName, bookkeeperEnsemble,
brokerConfigZk);
+ config.setTransactionCoordinatorEnabled(true);
+ }
+
+
+ @Test(enabled = false)
public void testReplicatorProducerStatInTopic() throws Exception {
super.testReplicatorProducerStatInTopic();
}
@@ -484,7 +494,7 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
// The topics under the namespace of the cluster-1 will be deleted.
// Verify the result.
admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster2)));
-
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(()
-> {
+
Awaitility.await().atMost(Duration.ofSeconds(120)).ignoreExceptions().untilAsserted(()
-> {
Map<String, CompletableFuture<Optional<Topic>>> tps =
pulsar1.getBrokerService().getTopics();
assertFalse(tps.containsKey(topic));
assertFalse(tps.containsKey(topicChangeEvents));
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index 9a3689912c9..df8cd6116cd 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -89,6 +89,9 @@ public class SystemTopicNames {
return
TopicName.getPartitionedTopicName(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
}
+ /**
+ * These topics can not be created manually by users.
+ */
public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString())
@@ -96,6 +99,14 @@ public class SystemTopicNames {
|| topic.endsWith(PENDING_ACK_STORE_SUFFIX);
}
+ public static boolean
isTransactionBufferOrPendingAckSystemTopicName(TopicName topicName) {
+ String topic = topicName.getPartitionedTopicName();
+ return topic.endsWith(TRANSACTION_BUFFER_SNAPSHOT.toString())
+ ||
topic.endsWith(TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS.toString())
+ || topic.endsWith(TRANSACTION_BUFFER_SNAPSHOT_INDEXES)
+ || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
+ }
+
public static boolean isSystemTopic(TopicName topicName) {
TopicName nonPartitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
return isEventSystemTopic(nonPartitionedTopicName) ||
isTransactionInternalName(nonPartitionedTopicName);