This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new be87cd9bfe3 [fix][broker]system topic was created with different 
partitions acrossing clusters after enabled namespace-level replication (#25312)
be87cd9bfe3 is described below

commit be87cd9bfe39c35f99689691ac96b50cade84c54
Author: fengyubiao <[email protected]>
AuthorDate: Tue Mar 17 17:43:41 2026 +0800

    [fix][broker]system topic was created with different partitions acrossing 
clusters after enabled namespace-level replication (#25312)
---
 .../pulsar/broker/service/BrokerService.java       | 123 +++++++++++++++---
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   6 +
 .../service/OneWayReplicatorUsingGlobalZKTest.java | 137 +++++++++++++++++++++
 3 files changed, 251 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 79a26ec56ae..d6226254f3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -114,6 +114,7 @@ import 
org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -144,8 +145,10 @@ import 
org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
 import org.apache.pulsar.broker.validator.BindAddressValidator;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -3386,20 +3389,32 @@ public class BrokerService implements Closeable {
                             return CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0));
                         }
 
-                        // Allow auto create non-partitioned topic.
-                        boolean autoCreatePartitionedTopic = 
pulsar.getBrokerService()
-                                .isDefaultTopicTypePartitioned(topicName, 
policies);
-                        if (!autoCreatePartitionedTopic || 
topicName.isPartitioned()) {
-                            return CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0));
-                        }
+                        return 
getRemotePartitionedTopicMetadataForAutoCreation(topicName, policies)
+                            .thenCompose(remoteTopicExistsInfo -> {
+                                // If remote topic exists, prioritize topic 
shape from remote clusters.
+                                if (remoteTopicExistsInfo.isExists()) {
+                                    if (remoteTopicExistsInfo.getTopicType() 
== TopicType.PARTITIONED) {
+                                        return 
createPartitionedTopicMetadataAsync(topicName,
+                                            
remoteTopicExistsInfo.getPartitions());
+                                    }
+                                    return 
CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
+                                }
 
-                        // Create partitioned metadata.
-                        return 
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, 
policies)
-                            .exceptionallyCompose(ex -> {
+                                // Allow auto create non-partitioned topic.
+                                boolean autoCreatePartitionedTopic = 
pulsar.getBrokerService()
+                                        
.isDefaultTopicTypePartitioned(topicName, policies);
+                                if (!autoCreatePartitionedTopic || 
topicName.isPartitioned()) {
+                                    return 
CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
+                                }
+
+                                // Create partitioned metadata.
+                                return 
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName,
+                                        policies);
+                            }).exceptionallyCompose(ex -> {
                                 // The partitioned topic might be created 
concurrently.
                                 if (ex.getCause() instanceof 
MetadataStoreException.AlreadyExistsException) {
-                                    log.info("[{}] The partitioned topic is 
already created, try to refresh the cache"
-                                            + " and read again.", topicName);
+                                    log.info("[{}] The partitioned topic is 
already created, try to refresh "
+                                            + "the cache and read again.", 
topicName);
                                     
CompletableFuture<PartitionedTopicMetadata> recheckFuture =
                                             
fetchPartitionedTopicMetadataAsync(topicName, true);
                                     recheckFuture.exceptionally(ex2 -> {
@@ -3428,20 +3443,25 @@ public class BrokerService implements Closeable {
     private CompletableFuture<PartitionedTopicMetadata> 
createDefaultPartitionedTopicAsync(TopicName topicName,
                                                                                
         Optional<Policies> policies) {
         final int defaultNumPartitions = 
pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
+        return createPartitionedTopicMetadataAsync(topicName, 
defaultNumPartitions);
+    }
+
+    private CompletableFuture<PartitionedTopicMetadata> 
createPartitionedTopicMetadataAsync(TopicName topicName,
+                                                                               
             int numPartitions) {
         final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (defaultNumPartitions <= 0) {
+        if (numPartitions <= 0) {
             return FutureUtil.failedFuture(
                     new IllegalArgumentException("Default number of partitions 
should be more than 0"));
         }
-        if (maxPartitions > 0 && defaultNumPartitions > maxPartitions) {
+        if (maxPartitions > 0 && numPartitions > maxPartitions) {
             return FutureUtil.failedFuture(
                     new IllegalArgumentException("Number of partitions should 
be less than or equal to "
                             + maxPartitions));
         }
 
-        PartitionedTopicMetadata configMetadata = new 
PartitionedTopicMetadata(defaultNumPartitions);
+        PartitionedTopicMetadata configMetadata = new 
PartitionedTopicMetadata(numPartitions);
 
-        return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, 
true)
+        return checkMaxTopicsPerNamespace(topicName, numPartitions, true)
                 .thenCompose(__ -> {
                     PartitionedTopicResources partitionResources = 
pulsar.getPulsarResources().getNamespaceResources()
                             .getPartitionedTopicResources();
@@ -3453,6 +3473,79 @@ public class BrokerService implements Closeable {
                 });
     }
 
+    private CompletableFuture<TopicExistsInfo> 
getRemotePartitionedTopicMetadataForAutoCreation(
+            TopicName topicName, Optional<Policies> policies) {
+        if (!pulsar.getConfig().isCreateTopicToRemoteClusterForReplication()) {
+            return 
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+        }
+        if (topicName.isPartitioned() || !topicName.isPersistent() || 
policies.isEmpty()) {
+            return 
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+        }
+        Set<String> replicationClusters = policies.get().replication_clusters;
+        if (replicationClusters == null || replicationClusters.isEmpty()) {
+            return 
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+        }
+        String localCluster = pulsar.getConfiguration().getClusterName();
+        if (!replicationClusters.contains(localCluster) || 
replicationClusters.size() <= 1) {
+            return 
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+        }
+        List<String> remoteClusters = replicationClusters.stream()
+                .filter(cluster -> !cluster.equals(localCluster))
+                .sorted()
+                .toList();
+        return findRemoteTopicMetadataForAutoCreation(topicName, 
remoteClusters, 0, null);
+    }
+
+    private CompletableFuture<TopicExistsInfo> 
findRemoteTopicMetadataForAutoCreation(
+            TopicName topicName, List<String> remoteClusters, int index, 
Throwable errOccurred) {
+        if (index >= remoteClusters.size()) {
+            if (errOccurred != null) {
+                log.error("[{}] Failed to check remote topic partitioned 
metadata on cluster {}. Fallback to "
+                    + "default auto topic creation policy.",
+                    topicName, remoteClusters, errOccurred);
+            }
+            return 
CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists());
+        }
+        final String remoteCluster = remoteClusters.get(index);
+        return 
pulsar.getPulsarResources().getClusterResources().getClusterAsync(remoteCluster)
+            .thenCompose(clusterData -> {
+                if (clusterData.isEmpty()) {
+                    log.warn("[{}] Skip checking remote cluster {} because 
cluster data is missing",
+                            topicName, remoteCluster);
+                    return findRemoteTopicMetadataForAutoCreation(topicName, 
remoteClusters, index + 1, null);
+                }
+                PulsarClient client = getReplicationClient(remoteCluster, 
clusterData);
+                CompletableFuture<TopicExistsInfo> future = new 
CompletableFuture<>();
+                client.getPartitionsForTopic(topicName.toString(), 
false).handle((topics, t) -> {
+                    if (t != null) {
+                        Throwable actEx = 
FutureUtil.unwrapCompletionException(t);
+                        if (actEx instanceof 
PulsarClientException.NotFoundException
+                            | actEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                            | actEx instanceof 
PulsarAdminException.NotFoundException) {
+                            
future.complete(TopicExistsInfo.newTopicNotExists());
+                        } else {
+                            FutureUtil.completeAfter(future,
+                                
findRemoteTopicMetadataForAutoCreation(topicName, remoteClusters, index + 1, 
actEx));
+                        }
+                        return null;
+                    }
+                    if (topics.isEmpty()) {
+                        future.complete(TopicExistsInfo.newTopicNotExists());
+                    } else if (topics.size() == 1 && 
!TopicName.get(topics.get(0)).isPartitioned()) {
+                        
future.complete(TopicExistsInfo.newNonPartitionedTopicExists());
+                    } else {
+                        int maxPartitionNum = 0;
+                        for (String topic : topics) {
+                            maxPartitionNum = Math.max(maxPartitionNum, 
TopicName.get(topic).getPartitionIndex());
+                        }
+                        
future.complete(TopicExistsInfo.newPartitionedTopicExists(maxPartitionNum + 1));
+                    }
+                    return null;
+                });
+                return future;
+            });
+    }
+
     public CompletableFuture<PartitionedTopicMetadata> 
fetchPartitionedTopicMetadataAsync(TopicName topicName) {
         return fetchPartitionedTopicMetadataAsync(topicName, false);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 7ed05939045..1f80d9a1925 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -138,6 +138,12 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         uriInfo = mock(UriInfo.class);
     }
 
+    @Override
+    protected void doInitConf() throws Exception {
+        configureInitialConfig(conf);
+        conf.setCreateTopicToRemoteClusterForReplication(false);
+    }
+
     @Override
     @BeforeMethod
     protected void setup() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 0ae33d247f1..aa8d0aabcb6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -37,10 +37,13 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.namespace.TopicExistsInfo;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
@@ -50,14 +53,18 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.policies.data.TopicType;
+import 
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -669,6 +676,136 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         }
     }
 
+    @DataProvider
+    public Object[][] localSystemTopicPartitions() {
+        return new Object[][] {
+                {0},
+                {3}
+        };
+    }
+
+    @Test(dataProvider = "localSystemTopicPartitions")
+    public void testSystemTopicCreationWithDifferentTopicCreationRule(int 
localSystemTopicPartitions) throws Exception {
+        String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
+        Predicate<String> topicNameFilter = t -> 
TopicName.get(t).getNamespace().equals(ns);
+        String systemTopic = "persistent://" + ns + "/__change_events";
+        admin1.namespaces().createNamespace(ns);
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1)), false);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(admin1.namespaces().getNamespaceReplicationClusters(ns).size(), 1);
+            
assertEquals(admin2.namespaces().getNamespaceReplicationClusters(ns).size(), 1);
+        });
+
+        // Trigger system topic creation on cluster1, following {@param 
localSystemTopicPartitions}.
+        AutoTopicCreationOverride autoTopicCreation1 = null;
+        if (localSystemTopicPartitions == 0) {
+            autoTopicCreation1 = 
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+                    .topicType("non-partitioned").build();
+        } else {
+            autoTopicCreation1 = 
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+                    
.topicType("partitioned").defaultNumPartitions(localSystemTopicPartitions).build();
+        }
+        admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation1);
+        Awaitility.await().untilAsserted(() -> {
+            AutoTopicCreationOverride autoTopicCreationOverride =
+                    admin1.namespaces().getAutoTopicCreationAsync(ns).get(3, 
TimeUnit.SECONDS);
+            assertNotNull(autoTopicCreationOverride);
+            if (localSystemTopicPartitions == 0) {
+                
assertTrue("non-partitioned".equalsIgnoreCase(autoTopicCreationOverride.getTopicType()));
+            } else {
+                
assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(), 
localSystemTopicPartitions);
+            }
+        });
+        // Use a topic loading to trigger system topic creation.
+        String topicUsedToTriggerSystemTopic = 
BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
+        
admin1.topics().createNonPartitionedTopic(topicUsedToTriggerSystemTopic);
+        admin1.topics().delete(topicUsedToTriggerSystemTopic, false);
+        // Verify: the system topic was created as expected.
+        Awaitility.await().untilAsserted(() -> {
+            TopicExistsInfo existsInfo = pulsar1.getNamespaceService()
+                    .checkTopicExistsAsync(TopicName.get(systemTopic)).get(3, 
TimeUnit.SECONDS);
+            assertTrue(existsInfo.isExists());
+            if (localSystemTopicPartitions == 0) {
+                assertEquals(existsInfo.getTopicType(), 
TopicType.NON_PARTITIONED);
+            } else {
+                assertEquals(existsInfo.getTopicType(), TopicType.PARTITIONED);
+                assertEquals(existsInfo.getPartitions(), 
localSystemTopicPartitions);
+            }
+        });
+
+        // Enable replication.
+        // Set topic auto-creation rule to "partitions: 2".
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp");
+        final Set<String> clusters = new HashSet<>(Arrays.asList(cluster1, 
cluster2));
+        admin1.namespaces().setNamespaceReplicationClusters(ns, clusters, 
true);
+        AutoTopicCreationOverride autoTopicCreation2 =
+                
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+                        
.topicType("partitioned").defaultNumPartitions(2).build();
+        admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation2);
+        admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation2);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(admin1.namespaces().getAutoTopicCreationAsync(ns).join()
+                    .getDefaultNumPartitions(), 2);
+            
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join()
+                    .getDefaultNumPartitions(), 2);
+        });
+
+        admin2.topics().createNonPartitionedTopic(tp);
+        Producer<String> p2 = 
client2.newProducer(Schema.STRING).topic(tp).create();
+        p2.send("msg-1");
+        p2.close();
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-1");
+        p1.close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic1 = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertFalse(persistentTopic1.getReplicators().isEmpty());
+            PersistentTopic persistentTopic2 = (PersistentTopic) 
broker2.getTopic(tp, false).join().get();
+            assertFalse(persistentTopic2.getReplicators().isEmpty());
+        });
+
+        // Verify: the topics are the same between two clusters.
+        Awaitility.await().untilAsserted(() -> {
+            List<String> topics1 = 
pulsar1.getBrokerService().getTopics().keySet()
+                    
.stream().filter(topicNameFilter).collect(Collectors.toList());
+            List<String> topics2 = 
pulsar2.getBrokerService().getTopics().keySet()
+                    
.stream().filter(topicNameFilter).collect(Collectors.toList());
+            Collections.sort(topics1);
+            Collections.sort(topics2);
+            boolean systemTopicCreated1 = false;
+            for (String tp1 : topics1) {
+                if (tp1.contains("__change_events")) {
+                    systemTopicCreated1 = true;
+                    break;
+                }
+            }
+            boolean systemTopicCreated2 = false;
+            for (String tp2 : topics2) {
+                if (tp2.contains("__change_events")) {
+                    systemTopicCreated2 = true;
+                    break;
+                }
+            }
+            log.info("topics1: {}", topics1);
+            log.info("topics2: {}", topics2);
+            assertTrue(systemTopicCreated1);
+            assertTrue(systemTopicCreated2);
+            assertEquals(topics1, topics2);
+        });
+
+        // cleanup.
+        admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
+        admin2.topics().setReplicationClusters(tp, Arrays.asList(cluster2));
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic1 = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertTrue(persistentTopic1.getReplicators().isEmpty());
+            PersistentTopic persistentTopic2 = (PersistentTopic) 
broker2.getTopic(tp, false).join().get();
+            assertTrue(persistentTopic2.getReplicators().isEmpty());
+        });
+        admin1.topics().delete(tp, false);
+        admin2.topics().delete(tp, false);
+    }
+
     @Test
     public void testUpdateNamespacePolicies() throws Exception {
         // Create a namespace and allow both clusters to access.

Reply via email to