This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 82af2991c6a [improve][broker] Deny removing local cluster from topic
level replicated cluster policy (#24351)
82af2991c6a is described below
commit 82af2991c6a5c0f58c651bfb135b462389f19d47
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jun 18 00:34:47 2025 +0800
[improve][broker] Deny removing local cluster from topic level replicated
cluster policy (#24351)
---
.../apache/pulsar/broker/admin/AdminResource.java | 10 +++
.../broker/admin/impl/PersistentTopicsBase.java | 2 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 23 +++++-
.../broker/service/persistent/PersistentTopic.java | 12 ++-
.../pulsar/broker/admin/TopicPoliciesTest.java | 32 ++++++++
.../broker/service/OneWayReplicatorTest.java | 85 ++++++++++++++++++++++
.../broker/service/OneWayReplicatorTestBase.java | 41 +++++++++++
...OneWayReplicatorUsingGlobalPartitionedTest.java | 35 +++++++++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++
9 files changed, 241 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 257780690d6..84d55430f8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -962,6 +962,12 @@ public abstract class AdminResource extends
PulsarWebResource {
== Status.NOT_FOUND.getStatusCode();
}
+ protected static boolean is4xxRestException(Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ return realCause instanceof WebApplicationException
+ && (((WebApplicationException)
realCause).getResponse().getStatus() % 100 == 4);
+ }
+
protected static boolean isConflictException(Throwable ex) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
return realCause instanceof WebApplicationException
@@ -984,6 +990,10 @@ public abstract class AdminResource extends
PulsarWebResource {
return !isRedirectException(ex) && !isNotFoundException(ex) &&
!isBadRequest(ex);
}
+ protected static boolean isNot307And4xxException(Throwable ex) {
+ return !isRedirectException(ex) && !is4xxRestException(ex);
+ }
+
protected static String getTopicNotFoundErrorMessage(String topic) {
return String.format("Topic %s not found", topic);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 978b72697cc..bb19dc1ad8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4990,7 +4990,7 @@ public class PersistentTopicsBase extends AdminResource {
protected void handleTopicPolicyException(String methodName, Throwable
thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
- if (isNot307And404And400Exception(cause)) {
+ if (isNot307And4xxException(cause)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6138e3e0124..f154e4fdaa5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
@@ -46,6 +47,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -2344,7 +2346,26 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "List of replication clusters", required = true)
List<String> clusterIds) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE)
- .thenCompose(__ -> preValidation(authoritative))
+ .thenCompose(__ ->
preValidation(authoritative)).thenCompose(__ -> {
+ // Set a topic-level replicated clusters that do not
contain local cluster is not meaningful, except
+ // the following scenario: User has two clusters, which
enabled Geo-Replication through a global
+ // metadata store, the resources named partitioned topic
metadata and the resource namespace-level
+ // "replicated clusters" are shared between multi
clusters. Pulsar can hardly delete a specify
+ // partitioned topic. To support this use case, the
following steps can implement it:
+ // 1. set a global topic-level replicated clusters that do
not contain local cluster.
+ // 2. the local cluster will remove the subtopics
automatically, and remove the schemas and local
+ // topic policies. Just leave the global topic policies
there, which prevents the namespace level
+ // replicated clusters policy taking affect.
+ // TODO But the API "pulsar-admin topics
set-replication-clusters" does not support global policy,
+ // to support this scenario, a PIP is needed.
+ boolean clustersDoesNotContainsLocal =
CollectionUtils.isEmpty(clusterIds)
+ ||
!clusterIds.contains(pulsar().getConfig().getClusterName());
+ if (clustersDoesNotContainsLocal) {
+ return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED,
+ "Can not remove local cluster from the topic-level
replication clusters policy"));
+ }
+ return CompletableFuture.completedFuture(null);
+ })
.thenCompose(__ -> internalSetReplicationClusters(clusterIds))
.thenRun(() ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1fdad8294a3..2c653424913 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2016,9 +2016,15 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
});
- // TODO regarding the topic level
policies, it will be deleted at a seperate PR.
- // Because there is an issue related to
Global policies has not been solved so
- // far.
+ // There are only one cases that will
remove local clusters: using global metadata
+ // store, namespaces will share policies
cross multi clusters, including
+ // "replicated clusters" and "partitioned
topic metadata", we can hardly delete
+ // partitioned topic from one cluster and
keep it exists in another. Removing
+ // local cluster from the namespace level
"replicated clusters" can do this.
+ // TODO: there is no way to delete a
specify partitioned topic if users have enabled
+ // Geo-Replication with a global metadata
store, a PIP is needed.
+ // Since the system topic
"__change_events" under the namespace will also be
+ // deleted, we can skip to delete
topic-level policies.
}
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 7a0605b6a2d..dc412eef802 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -3851,4 +3852,35 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
.ratePeriodInSecond(10)
.build());
}
+
+ @DataProvider
+ public Object[][] topicTypes() {
+ return new Object[][]{
+ {TopicType.PARTITIONED},
+ {TopicType.NON_PARTITIONED}
+ };
+ }
+
+ @Test(dataProvider = "topicTypes")
+ public void testRemoveLocalCluster(TopicType topicType) throws Exception {
+ String topic = "persistent://" + myNamespace +
"/testSetSubRateWithSub";
+ if (TopicType.PARTITIONED.equals(topicType)) {
+ admin.topics().createNonPartitionedTopic(topic);
+ } else {
+ admin.topics().createPartitionedTopic(topic, 2);
+ }
+ try {
+ admin.topics().setReplicationClusters(topic,
Arrays.asList("not-local-cluster"));
+ fail("Can not remove local cluster from the topic-level
replication clusters policy");
+ } catch (PulsarAdminException.PreconditionFailedException e) {
+ assertTrue(e.getMessage().contains("Can not remove local cluster
from the topic-level replication clusters"
+ + " policy"));
+ }
+ // cleanup.
+ if (TopicType.PARTITIONED.equals(topicType)) {
+ admin.topics().delete(topic, false);
+ } else {
+ admin.topics().deletePartitionedTopic(topic, false);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 3c7edaec44d..1244300378a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1422,4 +1423,88 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
SchemaCompatibilityStrategy.FORWARD);
}
+
+ /***
+ * Manually modifying topic policies by Rest API.
+ * - Global topic level policies:
+ * - Add: replicate
+ * - Update: replicate
+ * - Delete a single policy(it is equivalent to specify updating):
delete both local and remote policies.
+ * - Local topic level policies:
+ * - Add: never replicate
+ * - Update: never replicate
+ * - Delete a single policy(it is equivalent to specify updating):
delete local policies only.
+ * Delete Topic triggers that both local and global policies will be
deleted in local cluster, but will not delete
+ * the remote cluster's global policies. This test case will be covered by
+ * "OneWayReplicatorUsingGlobalPartitionedTest.testRemoveCluster".
+ */
+ @Test
+ public void testTopicPoliciesReplicationRule() throws Exception {
+ // Init Pulsar resources.
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ final TopicName topicNameObj = TopicName.get(topicName);
+ final String subscriptionName = "s1";
+ admin1.topics().createNonPartitionedTopic(topicName);
+ Producer<byte[]> producer1 =
client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
+ waitReplicatorStarted(topicName);
+ producer1.close();
+
assertTrue(pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(topicNameObj).join());
+ admin1.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ admin1.topics().createSubscription(subscriptionName, topicName,
MessageId.earliest);
+ admin2.topics().createSubscription(subscriptionName, topicName,
MessageId.earliest);
+
+ // Case 1: Global topic level policies -> Add: replicate.
+ PublishRate publishRateAddGlobal = new PublishRate(100, 10000);
+ admin1.topicPolicies(true).setPublishRate(topicName,
publishRateAddGlobal);
+ // Case 4: Local topic level policies -> Add: never replicate.
+ PublishRate publishRateAddLocal = new PublishRate(200, 20000);
+ admin1.topicPolicies(false).setPublishRate(topicName,
publishRateAddLocal);
+ waitChangeEventsReplicated(replicatedNamespace);
+ Thread.sleep(2000);
+ Awaitility.await().untilAsserted(() -> {
+ PublishRate valueGlobal =
admin2.topicPolicies(true).getPublishRate(topicName);
+ assertEquals(valueGlobal, publishRateAddGlobal);
+ PublishRate valueLocal =
admin2.topicPolicies(false).getPublishRate(topicName);
+ assertNull(valueLocal);
+ });
+
+ // Case 2: Global topic level policies -> Update: replicate.
+ PublishRate publishRateUpdateGlobal = new PublishRate(300, 30000);
+ admin1.topicPolicies(true).setPublishRate(topicName,
publishRateUpdateGlobal);
+ // Case 5: Local topic level policies -> Update: never replicate.
+ PublishRate publishRateUpdateLocal = new PublishRate(400, 40000);
+ admin1.topicPolicies(false).setPublishRate(topicName,
publishRateUpdateLocal);
+ waitChangeEventsReplicated(replicatedNamespace);
+ Thread.sleep(2000);
+ Awaitility.await().untilAsserted(() -> {
+ PublishRate valueGlobal =
admin2.topicPolicies(true).getPublishRate(topicName);
+ assertEquals(valueGlobal, publishRateUpdateGlobal);
+ PublishRate valueLocal =
admin2.topicPolicies(false).getPublishRate(topicName);
+ assertNull(valueLocal);
+ });
+
+ // Case 3: Global topic level policies -> Delete: delete both local
and remote policies.
+ admin1.topicPolicies(true).removePublishRate(topicName);
+ waitChangeEventsReplicated(replicatedNamespace);
+ Thread.sleep(2000);
+ Awaitility.await().untilAsserted(() -> {
+ PublishRate valueGlobal =
admin2.topicPolicies(true).getPublishRate(topicName);
+ assertNull(valueGlobal);
+ });
+
+ // Case 6: Local topic level policies -> Delete: never replicate.
+ PublishRate publishRateAddLocal2 = new PublishRate(500, 50000);
+ admin2.topicPolicies(false).setPublishRate(topicName,
publishRateAddLocal2);
+ Awaitility.await().untilAsserted(() -> {
+ PublishRate valueLocal =
admin2.topicPolicies(false).getPublishRate(topicName);
+ assertEquals(valueLocal, publishRateAddLocal2);
+ });
+ admin1.topicPolicies(false).removePublishRate(topicName);
+ waitChangeEventsReplicated(replicatedNamespace);
+ Thread.sleep(2000);
+ Awaitility.await().untilAsserted(() -> {
+ PublishRate valueLocal =
admin2.topicPolicies(false).getPublishRate(topicName);
+ assertEquals(valueLocal, publishRateAddLocal2);
+ });
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 534da8bbc49..d222abde3a3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -36,9 +36,12 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -506,4 +510,41 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
||
!persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
});
}
+
+ protected void waitChangeEventsReplicated(String ns) {
+ String topicName = "persistent://" + ns + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+ TopicName topicNameObj = TopicName.get(topicName);
+ Optional<PartitionedTopicMetadata> metadata =
pulsar1.getPulsarResources().getNamespaceResources()
+ .getPartitionedTopicResources()
+ .getPartitionedTopicMetadataAsync(topicNameObj).join();
+ Function<Replicator, Boolean> ensureNoBacklog = new
Function<Replicator,Boolean>() {
+
+ @Override
+ public Boolean apply(Replicator replicator) {
+ if (!replicator.getRemoteCluster().equals("c2")) {
+ return true;
+ }
+ PersistentReplicator persistentReplicator =
(PersistentReplicator) replicator;
+ Position lac =
persistentReplicator.getCursor().getManagedLedger().getLastConfirmedEntry();
+ Position mdPos =
persistentReplicator.getCursor().getMarkDeletedPosition();
+ return mdPos.compareTo(lac) >= 0;
+ }
+ };
+ if (metadata.isPresent()) {
+ for (int index = 0; index < metadata.get().partitions; index++) {
+ String partitionName =
topicNameObj.getPartition(index).toString();
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(partitionName, false).join().get();
+ persistentTopic.getReplicators().values().forEach(replicator
-> {
+ assertTrue(ensureNoBacklog.apply(replicator));
+ });
+ }
+ } else {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ persistentTopic.getReplicators().values().forEach(replicator -> {
+ assertTrue(ensureNoBacklog.apply(replicator));
+ });
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 585fe6ececa..a97dbfa4efd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service;
+import static
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY;
+import static
org.apache.pulsar.broker.service.TopicPoliciesService.GetType.LOCAL_ONLY;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -34,6 +36,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -181,6 +185,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
admin1.namespaces().createNamespace(ns1);
admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
admin1.topics().createPartitionedTopic(topic, 2);
+ PublishRate publishRateAddGlobal = new PublishRate(100, 10000);
+ admin1.topicPolicies(true).setPublishRate(topic, publishRateAddGlobal);
+ PublishRate publishRateAddLocal1 = new PublishRate(200, 20000);
+ admin1.topicPolicies(false).setPublishRate(topic,
publishRateAddLocal1);
+ PublishRate publishRateAddLocal2 = new PublishRate(300, 30000);
+ admin2.topicPolicies(false).setPublishRate(topic,
publishRateAddLocal2);
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExists(TopicName.get(topic)));
@@ -190,6 +200,10 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
List<CompletableFuture<StoredSchema>> schemaList21
=
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
assertEquals(schemaList21.size(), 0);
+ PublishRate valueGlobal =
admin2.topicPolicies(true).getPublishRate(topic);
+ assertEquals(valueGlobal, publishRateAddGlobal);
+ PublishRate valueLocal =
admin2.topicPolicies(false).getPublishRate(topic);
+ assertEquals(valueLocal, publishRateAddLocal2);
});
// Wait for copying messages.
@@ -201,6 +215,10 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
assertTrue(tps.containsKey(topicP0));
assertTrue(tps.containsKey(topicP1));
assertTrue(tps.containsKey(topicChangeEvents));
+ Map<String, CompletableFuture<Optional<Topic>>> tps2 =
pulsar2.getBrokerService().getTopics();
+ assertTrue(tps2.containsKey(topicP0));
+ assertTrue(tps2.containsKey(topicP1));
+ assertTrue(tps2.containsKey(topicChangeEvents));
List<CompletableFuture<StoredSchema>> schemaList12
=
pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
assertEquals(schemaList12.size(), 1);
@@ -227,6 +245,17 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
List<CompletableFuture<StoredSchema>> schemaList23
=
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
assertEquals(schemaList23.size(), 1);
+ // Verify: the topic policies will be removed in local cluster,
but remote cluster will not.
+ Optional<TopicPolicies> globalPolicies2 =
pulsar2.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(topic),
GLOBAL_ONLY).join();
+ assertTrue(globalPolicies2.isPresent(), "Remote cluster should
have global policies.");
+ assertEquals(globalPolicies2.get().getPublishRate(),
publishRateAddGlobal,
+ "Remote cluster should have global policies: publish rate.");
+ Optional<TopicPolicies> localPolicies2 =
pulsar2.getTopicPoliciesService()
+ .getTopicPoliciesAsync(TopicName.get(topic),
LOCAL_ONLY).join();
+ assertTrue(localPolicies2.isPresent(), "Remote cluster should have
local policies.");
+ assertEquals(localPolicies2.get().getPublishRate(),
publishRateAddLocal2,
+ "Remote cluster should have local policies: publish rate.");
});
// cleanup.
@@ -239,4 +268,10 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
public void testIncompatibleMultiVersionSchema(boolean
enableDeduplication) throws Exception {
super.testIncompatibleMultiVersionSchema(enableDeduplication);
}
+
+ @Override
+ @Test
+ public void testTopicPoliciesReplicationRule() throws Exception {
+ super.testTopicPoliciesReplicationRule();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 5cbea8df129..450370a60d0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -229,4 +229,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
public void testIncompatibleMultiVersionSchema(boolean
enableDeduplication) throws Exception {
super.testIncompatibleMultiVersionSchema(enableDeduplication);
}
+
+ @Override
+ @Test
+ public void testTopicPoliciesReplicationRule() throws Exception {
+ super.testTopicPoliciesReplicationRule();
+ }
}