This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c0850eed7a108db2b61cae9a682e366d815935c3 Author: fengyubiao <[email protected]> AuthorDate: Wed May 7 23:46:54 2025 +0800 [fix][broker]Fix incorrect priority between topic policies and global topic policies (#24254) (cherry picked from commit 86857aedefd0c7025a58b450cdf8a4de451a799e) --- .../pulsar/broker/service/AbstractTopic.java | 63 ++++---- .../pulsar/broker/admin/TopicPoliciesTest.java | 170 +++++++++++++++++++++ .../common/policies/data/PolicyHierarchyValue.java | 16 +- .../policies/data/PolicyHierarchyValueTest.java | 4 +- 4 files changed, 224 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 366bc7225c4..902e07f0cb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -217,47 +217,58 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP } protected void updateTopicPolicy(TopicPolicies data) { + boolean isGlobalPolicies = data.isGlobalPolicies(); if (!isSystemTopic()) { // Only use namespace level setting for system topic. - topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters()); + topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters(), isGlobalPolicies); topicPolicies.getSchemaCompatibilityStrategy() - .updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy())); + .updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()), + isGlobalPolicies); } - topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies()); + topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies(), isGlobalPolicies); topicPolicies.getMaxSubscriptionsPerTopic() - .updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic())); + .updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()), isGlobalPolicies); topicPolicies.getMaxUnackedMessagesOnConsumer() - .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer())); + .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()), isGlobalPolicies); topicPolicies.getMaxUnackedMessagesOnSubscription() - .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription())); - topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic())); - topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic())); + .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()), isGlobalPolicies); + topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()), + isGlobalPolicies); + topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()), + isGlobalPolicies); topicPolicies.getMaxConsumersPerSubscription() - .updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription())); - topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies()); - topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled()); + .updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()), isGlobalPolicies); + topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies(), isGlobalPolicies); + topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled(), isGlobalPolicies); topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue( - data.getDeduplicationSnapshotIntervalSeconds()); + data.getDeduplicationSnapshotIntervalSeconds(), isGlobalPolicies); topicPolicies.getSubscriptionTypesEnabled().updateTopicValue( CollectionUtils.isEmpty(data.getSubscriptionTypesEnabled()) ? null : - EnumSet.copyOf(data.getSubscriptionTypesEnabled())); + EnumSet.copyOf(data.getSubscriptionTypesEnabled()), isGlobalPolicies); Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type -> this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue( - data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString()))); - topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize())); - topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds())); - topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate())); - topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled()); + data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString()), + isGlobalPolicies)); + topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()), + isGlobalPolicies); + topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()), + isGlobalPolicies); + topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()), isGlobalPolicies); + topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled(), isGlobalPolicies); topicPolicies.getReplicatorDispatchRate().updateTopicValue( - DispatchRateImpl.normalize(data.getReplicatorDispatchRate())); - topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis()); - topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate())); + DispatchRateImpl.normalize(data.getReplicatorDispatchRate()), isGlobalPolicies); + topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis(), + isGlobalPolicies); + topicPolicies.getSubscribeRate().updateTopicValue( + SubscribeRate.normalize(data.getSubscribeRate()), isGlobalPolicies); topicPolicies.getSubscriptionDispatchRate().updateTopicValue( - DispatchRateImpl.normalize(data.getSubscriptionDispatchRate())); - topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold()); - topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate())); - topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced()); - topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters()); + DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()), isGlobalPolicies); + topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold(), isGlobalPolicies); + topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()), + isGlobalPolicies); + topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced(), + isGlobalPolicies); + topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters(), isGlobalPolicies); this.subscriptionPolicies = data.getSubscriptionPolicies(); updateEntryFilters(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 3b1693ac860..1d860fbc3fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -439,6 +439,176 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(testTopic, true); } + @DataProvider(name = "clientRequestType") + public Object[][] clientRequestType() { + return new Object[][]{ + {"PULSAR_ADMIN"}, + {"HTTP"} + }; + } + + @Test(dataProvider = "clientRequestType") + public void testPriorityOfGlobalPolicies(String clientRequestType) throws Exception { + final SystemTopicBasedTopicPoliciesService topicPoliciesService + = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + final JerseyClient httpClient = JerseyClientBuilder.createClient(); + // create topic and load it up. + final String namespace = myNamespace; + final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + final TopicName topicName = TopicName.get(topic); + final String hostAndPort = pulsar.getWebServiceAddress(); + final String httpPath = "/admin/v2/persistent/" + namespace + "/" + TopicName.get(topic).getLocalName() + + "/maxConsumers"; + admin.topics().createNonPartitionedTopic(topic); + Producer producer = pulsarClient.newProducer().topic(topic).create(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopics().get(topic).get().get(); + + // Set non global policy. + // Verify: it affects. + if ("PULSAR_ADMIN".equals(clientRequestType)) { + admin.topicPolicies(false).setMaxConsumers(topic, 10); + } else { + Response res = httpClient.target(hostAndPort).path(httpPath) + .queryParam("isGlobal", "false") + .request() + .header("Content-Type", "application/json") + .post(Entity.json(10)); + assertTrue(res.getStatus() == 200 || res.getStatus() == 204); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, false).join().get() + .getMaxConsumerPerTopic(), 10); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + }); + + // Set global policy. + // Verify: topic policies has higher priority than global policies. + if ("PULSAR_ADMIN".equals(clientRequestType)) { + admin.topicPolicies(true).setMaxConsumers(topic, 20); + } else { + Response globalRes = httpClient.target(hostAndPort).path(httpPath) + .queryParam("isGlobal", "true") + .request() + .header("Content-Type", "application/json") + .post(Entity.json(20)); + assertTrue(globalRes.getStatus() == 200 || globalRes.getStatus() == 204); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, false).join().get() + .getMaxConsumerPerTopic(), 10); + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, true).join().get() + .getMaxConsumerPerTopic(), 20); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + }); + + // Remove non-global policy. + // Verify: global policy affects. + if ("PULSAR_ADMIN".equals(clientRequestType)) { + admin.topicPolicies(false).removeMaxConsumers(topic); + } else { + Response removeRes = httpClient.target(hostAndPort).path(httpPath) + .queryParam("isGlobal", "false") + .request() + .header("Content-Type", "application/json") + .delete(); + assertTrue(removeRes.getStatus() == 200 || removeRes.getStatus() == 204); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, true).join().get() + .getMaxConsumerPerTopic(), 20); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 20); + }); + + // cleanup. + producer.close(); + admin.topics().delete(topic, false); + } + + @Test(dataProvider = "clientRequestType") + public void testPriorityOfGlobalPolicies2(String clientRequestType) throws Exception { + final SystemTopicBasedTopicPoliciesService topicPoliciesService + = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + final JerseyClient httpClient = JerseyClientBuilder.createClient(); + // create topic and load it up. + final String namespace = myNamespace; + final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + final TopicName topicName = TopicName.get(topic); + final String hostAndPort = pulsar.getWebServiceAddress(); + final String httpPath = "/admin/v2/persistent/" + namespace + "/" + TopicName.get(topic).getLocalName() + + "/maxConsumers"; + admin.topics().createNonPartitionedTopic(topic); + Producer producer = pulsarClient.newProducer().topic(topic).create(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopics().get(topic).get().get(); + + // Set global policy. + // Verify: it affects. + if ("PULSAR_ADMIN".equals(clientRequestType)) { + admin.topicPolicies(true).setMaxConsumers(topic, 20); + } else { + Response globalRes = httpClient.target(hostAndPort).path(httpPath) + .queryParam("isGlobal", "true") + .request() + .header("Content-Type", "application/json") + .post(Entity.json(20)); + assertTrue(globalRes.getStatus() == 200 || globalRes.getStatus() == 204); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, true).join().get() + .getMaxConsumerPerTopic(), 20); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 20); + }); + + // Set non global policy. + // Verify: topic policies has higher priority than global policies. + if ("PULSAR_ADMIN".equals(clientRequestType)) { + admin.topicPolicies(false).setMaxConsumers(topic, 10); + } else { + Response res = httpClient.target(hostAndPort).path(httpPath) + .queryParam("isGlobal", "false") + .request() + .header("Content-Type", "application/json") + .post(Entity.json(10)); + assertTrue(res.getStatus() == 200 || res.getStatus() == 204); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, false).join().get() + .getMaxConsumerPerTopic(), 10); + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, true).join().get() + .getMaxConsumerPerTopic(), 20); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + }); + + // Remove global policy. + // Verify: non-global policy affects. + if ("PULSAR_ADMIN".equals(clientRequestType)) { + admin.topicPolicies(true).removeMaxConsumers(topic); + } else { + Response removeRes = httpClient.target(hostAndPort).path(httpPath) + .queryParam("isGlobal", "true") + .request() + .header("Content-Type", "application/json") + .delete(); + assertTrue(removeRes.getStatus() == 200 || removeRes.getStatus() == 204); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, false).join().get() + .getMaxConsumerPerTopic(), 10); + HierarchyTopicPolicies hierarchyTopicPolicies = persistentTopic.getHierarchyTopicPolicies(); + assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10); + }); + + // cleanup. + producer.close(); + admin.topics().delete(topic, false); + } + @Test(timeOut = 20000) public void testGetSizeBasedBacklogQuotaApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java index a23ef9c8780..d416f012b7f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java @@ -38,6 +38,9 @@ public class PolicyHierarchyValue<T> { @Getter private volatile T topicValue; + @Getter + private volatile T topicGlobalValue; + private volatile T value; public PolicyHierarchyValue() { @@ -53,8 +56,17 @@ public class PolicyHierarchyValue<T> { updateValue(); } + @Deprecated public void updateTopicValue(T topicValue) { - this.topicValue = topicValue; + updateTopicValue(topicValue, false); + } + + public void updateTopicValue(T topicValue, boolean isGlobalPolicy) { + if (isGlobalPolicy) { + this.topicGlobalValue = topicValue; + } else { + this.topicValue = topicValue; + } updateValue(); } @@ -62,6 +74,8 @@ public class PolicyHierarchyValue<T> { VALUE_UPDATER.updateAndGet(this, (preValue) -> { if (topicValue != null) { return topicValue; + } else if (topicGlobalValue != null) { + return topicGlobalValue; } else if (namespaceValue != null) { return namespaceValue; } else { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java index 2d6e803050c..7a2298ddc23 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java @@ -33,13 +33,13 @@ public class PolicyHierarchyValueTest { value.updateNamespaceValue(2); Assert.assertEquals(value.get(), Integer.valueOf(2)); - value.updateTopicValue(3); + value.updateTopicValue(3, false); Assert.assertEquals(value.get(), Integer.valueOf(3)); value.updateNamespaceValue(null); Assert.assertEquals(value.get(), Integer.valueOf(3)); - value.updateTopicValue(null); + value.updateTopicValue(null, false); Assert.assertEquals(value.get(), Integer.valueOf(1)); } } \ No newline at end of file
