This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 82af2991c6a [improve][broker] Deny removing local cluster from topic 
level replicated cluster policy (#24351)
82af2991c6a is described below

commit 82af2991c6a5c0f58c651bfb135b462389f19d47
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jun 18 00:34:47 2025 +0800

    [improve][broker] Deny removing local cluster from topic level replicated 
cluster policy (#24351)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 10 +++
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 23 +++++-
 .../broker/service/persistent/PersistentTopic.java | 12 ++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 32 ++++++++
 .../broker/service/OneWayReplicatorTest.java       | 85 ++++++++++++++++++++++
 .../broker/service/OneWayReplicatorTestBase.java   | 41 +++++++++++
 ...OneWayReplicatorUsingGlobalPartitionedTest.java | 35 +++++++++
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  6 ++
 9 files changed, 241 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 257780690d6..84d55430f8f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -962,6 +962,12 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 == Status.NOT_FOUND.getStatusCode();
     }
 
+    protected static boolean is4xxRestException(Throwable ex) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+        return realCause instanceof WebApplicationException
+                && (((WebApplicationException) 
realCause).getResponse().getStatus() % 100 == 4);
+    }
+
     protected static boolean isConflictException(Throwable ex) {
         Throwable realCause = FutureUtil.unwrapCompletionException(ex);
         return realCause instanceof WebApplicationException
@@ -984,6 +990,10 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return !isRedirectException(ex) && !isNotFoundException(ex) && 
!isBadRequest(ex);
     }
 
+    protected static boolean isNot307And4xxException(Throwable ex) {
+        return !isRedirectException(ex) && !is4xxRestException(ex);
+    }
+
     protected static String getTopicNotFoundErrorMessage(String topic) {
         return String.format("Topic %s not found", topic);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 978b72697cc..bb19dc1ad8f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4990,7 +4990,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void handleTopicPolicyException(String methodName, Throwable 
thr, AsyncResponse asyncResponse) {
         Throwable cause = thr.getCause();
-        if (isNot307And404And400Exception(cause)) {
+        if (isNot307And4xxException(cause)) {
             log.error("[{}] Failed to perform {} on topic {}",
                     clientAppId(), methodName, topicName, cause);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6138e3e0124..f154e4fdaa5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
@@ -46,6 +47,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -2344,7 +2346,26 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "List of replication clusters", required = true) 
List<String> clusterIds) {
         validateTopicName(tenant, namespace, encodedTopic);
         validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE)
-                .thenCompose(__ -> preValidation(authoritative))
+                .thenCompose(__ -> 
preValidation(authoritative)).thenCompose(__ -> {
+                    // Set a topic-level replicated clusters that do not 
contain local cluster is not meaningful, except
+                    // the following scenario: User has two clusters, which 
enabled Geo-Replication through a global
+                    // metadata store, the resources named partitioned topic 
metadata and the resource namespace-level
+                    // "replicated clusters" are shared between multi 
clusters. Pulsar can hardly delete a specify
+                    // partitioned topic. To support this use case, the 
following steps can implement it:
+                    // 1. set a global topic-level replicated clusters that do 
not contain local cluster.
+                    // 2. the local cluster will remove the subtopics 
automatically, and remove the schemas and local
+                    //    topic policies. Just leave the global topic policies 
there, which prevents the namespace level
+                    //    replicated clusters policy taking affect.
+                    // TODO But the API "pulsar-admin topics 
set-replication-clusters" does not support global policy,
+                    //   to support this scenario, a PIP is needed.
+                    boolean clustersDoesNotContainsLocal = 
CollectionUtils.isEmpty(clusterIds)
+                            || 
!clusterIds.contains(pulsar().getConfig().getClusterName());
+                    if (clustersDoesNotContainsLocal) {
+                        return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED,
+                            "Can not remove local cluster from the topic-level 
replication clusters policy"));
+                    }
+                    return CompletableFuture.completedFuture(null);
+                })
                 .thenCompose(__ -> internalSetReplicationClusters(clusterIds))
                 .thenRun(() -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1fdad8294a3..2c653424913 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2016,9 +2016,15 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                             }
 
                                     });
-                                    // TODO regarding the topic level 
policies, it will be deleted at a seperate PR.
-                                    //   Because there is an issue related to 
Global policies has not been solved so
-                                    //   far.
+                                    // There are only one cases that will 
remove local clusters: using global metadata
+                                    // store, namespaces will share policies 
cross multi clusters, including
+                                    // "replicated clusters" and "partitioned 
topic metadata", we can hardly delete
+                                    // partitioned topic from one cluster and 
keep it exists in another. Removing
+                                    // local cluster from the namespace level 
"replicated clusters" can do this.
+                                    // TODO: there is no way to delete a 
specify partitioned topic if users have enabled
+                                    //  Geo-Replication with a global metadata 
store, a PIP is needed.
+                                    // Since the system topic 
"__change_events" under the namespace will also be
+                                    // deleted, we can skip to delete 
topic-level policies.
                                 }
                             }
                         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 7a0605b6a2d..dc412eef802 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.fail;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -3851,4 +3852,35 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 .ratePeriodInSecond(10)
                 .build());
     }
+
+    @DataProvider
+    public Object[][] topicTypes() {
+        return new Object[][]{
+            {TopicType.PARTITIONED},
+            {TopicType.NON_PARTITIONED}
+        };
+    }
+
+    @Test(dataProvider = "topicTypes")
+    public void testRemoveLocalCluster(TopicType topicType) throws Exception {
+        String topic = "persistent://" + myNamespace + 
"/testSetSubRateWithSub";
+        if (TopicType.PARTITIONED.equals(topicType)) {
+            admin.topics().createNonPartitionedTopic(topic);
+        } else {
+            admin.topics().createPartitionedTopic(topic, 2);
+        }
+        try {
+            admin.topics().setReplicationClusters(topic, 
Arrays.asList("not-local-cluster"));
+            fail("Can not remove local cluster from the topic-level 
replication clusters policy");
+        } catch (PulsarAdminException.PreconditionFailedException e) {
+            assertTrue(e.getMessage().contains("Can not remove local cluster 
from the topic-level replication clusters"
+                + " policy"));
+        }
+        // cleanup.
+        if (TopicType.PARTITIONED.equals(topicType)) {
+            admin.topics().delete(topic, false);
+        } else {
+            admin.topics().deletePartitionedTopic(topic, false);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 3c7edaec44d..1244300378a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1422,4 +1423,88 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
                 SchemaCompatibilityStrategy.FORWARD);
     }
+
+    /***
+     * Manually modifying topic policies by Rest API.
+     *   - Global topic level policies:
+     *     - Add: replicate
+     *     - Update: replicate
+     *     - Delete a single policy(it is equivalent to specify updating): 
delete both local and remote policies.
+     *   - Local topic level policies:
+     *     - Add: never replicate
+     *     - Update: never replicate
+     *     - Delete a single policy(it is equivalent to specify updating): 
delete local policies only.
+     * Delete Topic triggers that both local and global policies will be 
deleted in local cluster, but will not delete
+     * the remote cluster's global policies. This test case will be covered by
+     * "OneWayReplicatorUsingGlobalPartitionedTest.testRemoveCluster".
+     */
+    @Test
+    public void testTopicPoliciesReplicationRule() throws Exception {
+        // Init Pulsar resources.
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        final TopicName topicNameObj = TopicName.get(topicName);
+        final String subscriptionName = "s1";
+        admin1.topics().createNonPartitionedTopic(topicName);
+        Producer<byte[]> producer1 = 
client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
+        waitReplicatorStarted(topicName);
+        producer1.close();
+        
assertTrue(pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(topicNameObj).join());
+        admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        admin1.topics().createSubscription(subscriptionName, topicName, 
MessageId.earliest);
+        admin2.topics().createSubscription(subscriptionName, topicName, 
MessageId.earliest);
+
+        // Case 1: Global topic level policies -> Add: replicate.
+        PublishRate publishRateAddGlobal = new PublishRate(100, 10000);
+        admin1.topicPolicies(true).setPublishRate(topicName, 
publishRateAddGlobal);
+        // Case 4: Local topic level policies -> Add: never replicate.
+        PublishRate publishRateAddLocal = new PublishRate(200, 20000);
+        admin1.topicPolicies(false).setPublishRate(topicName, 
publishRateAddLocal);
+        waitChangeEventsReplicated(replicatedNamespace);
+        Thread.sleep(2000);
+        Awaitility.await().untilAsserted(() -> {
+            PublishRate valueGlobal = 
admin2.topicPolicies(true).getPublishRate(topicName);
+            assertEquals(valueGlobal, publishRateAddGlobal);
+            PublishRate valueLocal = 
admin2.topicPolicies(false).getPublishRate(topicName);
+            assertNull(valueLocal);
+        });
+
+        // Case 2: Global topic level policies -> Update: replicate.
+        PublishRate publishRateUpdateGlobal = new PublishRate(300, 30000);
+        admin1.topicPolicies(true).setPublishRate(topicName, 
publishRateUpdateGlobal);
+        // Case 5: Local topic level policies -> Update: never replicate.
+        PublishRate publishRateUpdateLocal = new PublishRate(400, 40000);
+        admin1.topicPolicies(false).setPublishRate(topicName, 
publishRateUpdateLocal);
+        waitChangeEventsReplicated(replicatedNamespace);
+        Thread.sleep(2000);
+        Awaitility.await().untilAsserted(() -> {
+            PublishRate valueGlobal = 
admin2.topicPolicies(true).getPublishRate(topicName);
+            assertEquals(valueGlobal, publishRateUpdateGlobal);
+            PublishRate valueLocal = 
admin2.topicPolicies(false).getPublishRate(topicName);
+            assertNull(valueLocal);
+        });
+
+        // Case 3: Global topic level policies -> Delete: delete both local 
and remote policies.
+        admin1.topicPolicies(true).removePublishRate(topicName);
+        waitChangeEventsReplicated(replicatedNamespace);
+        Thread.sleep(2000);
+        Awaitility.await().untilAsserted(() -> {
+            PublishRate valueGlobal = 
admin2.topicPolicies(true).getPublishRate(topicName);
+            assertNull(valueGlobal);
+        });
+
+        // Case 6: Local topic level policies -> Delete: never replicate.
+        PublishRate publishRateAddLocal2 = new PublishRate(500, 50000);
+        admin2.topicPolicies(false).setPublishRate(topicName, 
publishRateAddLocal2);
+        Awaitility.await().untilAsserted(() -> {
+            PublishRate valueLocal = 
admin2.topicPolicies(false).getPublishRate(topicName);
+            assertEquals(valueLocal, publishRateAddLocal2);
+        });
+        admin1.topicPolicies(false).removePublishRate(topicName);
+        waitChangeEventsReplicated(replicatedNamespace);
+        Thread.sleep(2000);
+        Awaitility.await().untilAsserted(() -> {
+            PublishRate valueLocal = 
admin2.topicPolicies(false).getPublishRate(topicName);
+            assertEquals(valueLocal, publishRateAddLocal2);
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 534da8bbc49..d222abde3a3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -36,9 +36,12 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -506,4 +510,41 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
                     || 
!persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
         });
     }
+
+    protected void waitChangeEventsReplicated(String ns) {
+        String topicName = "persistent://" + ns + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        TopicName topicNameObj = TopicName.get(topicName);
+        Optional<PartitionedTopicMetadata> metadata = 
pulsar1.getPulsarResources().getNamespaceResources()
+                .getPartitionedTopicResources()
+                .getPartitionedTopicMetadataAsync(topicNameObj).join();
+        Function<Replicator, Boolean> ensureNoBacklog = new 
Function<Replicator,Boolean>() {
+
+            @Override
+            public Boolean apply(Replicator replicator) {
+                if (!replicator.getRemoteCluster().equals("c2")) {
+                    return true;
+                }
+                PersistentReplicator persistentReplicator = 
(PersistentReplicator) replicator;
+                Position lac = 
persistentReplicator.getCursor().getManagedLedger().getLastConfirmedEntry();
+                Position mdPos = 
persistentReplicator.getCursor().getMarkDeletedPosition();
+                return mdPos.compareTo(lac) >= 0;
+            }
+        };
+        if (metadata.isPresent()) {
+            for (int index = 0; index < metadata.get().partitions; index++) {
+                String partitionName = 
topicNameObj.getPartition(index).toString();
+                PersistentTopic persistentTopic =
+                        (PersistentTopic) 
pulsar1.getBrokerService().getTopic(partitionName, false).join().get();
+                persistentTopic.getReplicators().values().forEach(replicator 
-> {
+                    assertTrue(ensureNoBacklog.apply(replicator));
+                });
+            }
+        } else {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+            persistentTopic.getReplicators().values().forEach(replicator -> {
+               assertTrue(ensureNoBacklog.apply(replicator));
+            });
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 585fe6ececa..a97dbfa4efd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY;
+import static 
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.LOCAL_ONLY;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -34,6 +36,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -181,6 +185,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         admin1.namespaces().createNamespace(ns1);
         admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
         admin1.topics().createPartitionedTopic(topic, 2);
+        PublishRate publishRateAddGlobal = new PublishRate(100, 10000);
+        admin1.topicPolicies(true).setPublishRate(topic, publishRateAddGlobal);
+        PublishRate publishRateAddLocal1 = new PublishRate(200, 20000);
+        admin1.topicPolicies(false).setPublishRate(topic, 
publishRateAddLocal1);
+        PublishRate publishRateAddLocal2 = new PublishRate(300, 30000);
+        admin2.topicPolicies(false).setPublishRate(topic, 
publishRateAddLocal2);
         Awaitility.await().untilAsserted(() -> {
            
assertTrue(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
                    .partitionedTopicExists(TopicName.get(topic)));
@@ -190,6 +200,10 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
             List<CompletableFuture<StoredSchema>> schemaList21
                     = 
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
             assertEquals(schemaList21.size(), 0);
+            PublishRate valueGlobal = 
admin2.topicPolicies(true).getPublishRate(topic);
+            assertEquals(valueGlobal, publishRateAddGlobal);
+            PublishRate valueLocal = 
admin2.topicPolicies(false).getPublishRate(topic);
+            assertEquals(valueLocal, publishRateAddLocal2);
         });
 
         // Wait for copying messages.
@@ -201,6 +215,10 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
             assertTrue(tps.containsKey(topicP0));
             assertTrue(tps.containsKey(topicP1));
             assertTrue(tps.containsKey(topicChangeEvents));
+            Map<String, CompletableFuture<Optional<Topic>>> tps2 = 
pulsar2.getBrokerService().getTopics();
+            assertTrue(tps2.containsKey(topicP0));
+            assertTrue(tps2.containsKey(topicP1));
+            assertTrue(tps2.containsKey(topicChangeEvents));
             List<CompletableFuture<StoredSchema>> schemaList12
                     = 
pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
             assertEquals(schemaList12.size(), 1);
@@ -227,6 +245,17 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
             List<CompletableFuture<StoredSchema>> schemaList23
                     = 
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
             assertEquals(schemaList23.size(), 1);
+            // Verify: the topic policies will be removed in local cluster, 
but remote cluster will not.
+            Optional<TopicPolicies> globalPolicies2 = 
pulsar2.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(topic), 
GLOBAL_ONLY).join();
+            assertTrue(globalPolicies2.isPresent(), "Remote cluster should 
have global policies.");
+            assertEquals(globalPolicies2.get().getPublishRate(), 
publishRateAddGlobal,
+                "Remote cluster should have global policies: publish rate.");
+            Optional<TopicPolicies> localPolicies2 = 
pulsar2.getTopicPoliciesService()
+                    .getTopicPoliciesAsync(TopicName.get(topic), 
LOCAL_ONLY).join();
+            assertTrue(localPolicies2.isPresent(), "Remote cluster should have 
local policies.");
+            assertEquals(localPolicies2.get().getPublishRate(), 
publishRateAddLocal2,
+                "Remote cluster should have local policies: publish rate.");
         });
 
         // cleanup.
@@ -239,4 +268,10 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
     public void testIncompatibleMultiVersionSchema(boolean 
enableDeduplication) throws Exception {
         super.testIncompatibleMultiVersionSchema(enableDeduplication);
     }
+
+    @Override
+    @Test
+    public void testTopicPoliciesReplicationRule() throws Exception {
+        super.testTopicPoliciesReplicationRule();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 5cbea8df129..450370a60d0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -229,4 +229,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
     public void testIncompatibleMultiVersionSchema(boolean 
enableDeduplication) throws Exception {
         super.testIncompatibleMultiVersionSchema(enableDeduplication);
     }
+
+    @Override
+    @Test
+    public void testTopicPoliciesReplicationRule() throws Exception {
+        super.testTopicPoliciesReplicationRule();
+    }
 }

Reply via email to