This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c0850eed7a108db2b61cae9a682e366d815935c3
Author: fengyubiao <[email protected]>
AuthorDate: Wed May 7 23:46:54 2025 +0800

    [fix][broker]Fix incorrect priority between topic policies and global topic 
policies (#24254)
    
    (cherry picked from commit 86857aedefd0c7025a58b450cdf8a4de451a799e)
---
 .../pulsar/broker/service/AbstractTopic.java       |  63 ++++----
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 170 +++++++++++++++++++++
 .../common/policies/data/PolicyHierarchyValue.java |  16 +-
 .../policies/data/PolicyHierarchyValueTest.java    |   4 +-
 4 files changed, 224 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 366bc7225c4..902e07f0cb8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -217,47 +217,58 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     }
 
     protected void updateTopicPolicy(TopicPolicies data) {
+        boolean isGlobalPolicies = data.isGlobalPolicies();
         if (!isSystemTopic()) {
             // Only use namespace level setting for system topic.
-            
topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters());
+            
topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters(),
 isGlobalPolicies);
             topicPolicies.getSchemaCompatibilityStrategy()
-                    
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
+                    
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()),
+                            isGlobalPolicies);
         }
-        
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
+        
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies(),
 isGlobalPolicies);
         topicPolicies.getMaxSubscriptionsPerTopic()
-                
.updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()));
+                
.updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()), 
isGlobalPolicies);
         topicPolicies.getMaxUnackedMessagesOnConsumer()
-                
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()));
+                
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()), 
isGlobalPolicies);
         topicPolicies.getMaxUnackedMessagesOnSubscription()
-                
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()));
-        
topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()));
-        
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()));
+                
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()), 
isGlobalPolicies);
+        
topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()),
+                isGlobalPolicies);
+        
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()),
+                isGlobalPolicies);
         topicPolicies.getMaxConsumersPerSubscription()
-                
.updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()));
-        
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
-        
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
+                
.updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()), 
isGlobalPolicies);
+        
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies(),
 isGlobalPolicies);
+        
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled(),
 isGlobalPolicies);
         
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue(
-                data.getDeduplicationSnapshotIntervalSeconds());
+                data.getDeduplicationSnapshotIntervalSeconds(), 
isGlobalPolicies);
         topicPolicies.getSubscriptionTypesEnabled().updateTopicValue(
                 CollectionUtils.isEmpty(data.getSubscriptionTypesEnabled()) ? 
null :
-                        EnumSet.copyOf(data.getSubscriptionTypesEnabled()));
+                        EnumSet.copyOf(data.getSubscriptionTypesEnabled()), 
isGlobalPolicies);
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
                 
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
-                        data.getBackLogQuotaMap() == null ? null : 
data.getBackLogQuotaMap().get(type.toString())));
-        
topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
-        
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
-        
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
-        
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
+                        data.getBackLogQuotaMap() == null ? null : 
data.getBackLogQuotaMap().get(type.toString()),
+                        isGlobalPolicies));
+        
topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()),
+                isGlobalPolicies);
+        
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()),
+                isGlobalPolicies);
+        
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()),
 isGlobalPolicies);
+        
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled(),
 isGlobalPolicies);
         topicPolicies.getReplicatorDispatchRate().updateTopicValue(
-            DispatchRateImpl.normalize(data.getReplicatorDispatchRate()));
-        
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
-        
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
+            DispatchRateImpl.normalize(data.getReplicatorDispatchRate()), 
isGlobalPolicies);
+        
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis(),
+                isGlobalPolicies);
+        topicPolicies.getSubscribeRate().updateTopicValue(
+                SubscribeRate.normalize(data.getSubscribeRate()), 
isGlobalPolicies);
         topicPolicies.getSubscriptionDispatchRate().updateTopicValue(
-            DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
-        
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
-        
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
-        
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
-        
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
+            DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()), 
isGlobalPolicies);
+        
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold(),
 isGlobalPolicies);
+        
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()),
+                isGlobalPolicies);
+        
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced(),
+                isGlobalPolicies);
+        
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters(), 
isGlobalPolicies);
         this.subscriptionPolicies = data.getSubscriptionPolicies();
 
         updateEntryFilters();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 3b1693ac860..1d860fbc3fc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -439,6 +439,176 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
+    @DataProvider(name = "clientRequestType")
+    public Object[][] clientRequestType() {
+        return new Object[][]{
+            {"PULSAR_ADMIN"},
+            {"HTTP"}
+        };
+    }
+
+    @Test(dataProvider = "clientRequestType")
+    public void testPriorityOfGlobalPolicies(String clientRequestType) throws 
Exception {
+        final SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+        final JerseyClient httpClient = JerseyClientBuilder.createClient();
+        // create topic and load it up.
+        final String namespace = myNamespace;
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final TopicName topicName = TopicName.get(topic);
+        final String hostAndPort = pulsar.getWebServiceAddress();
+        final String httpPath = "/admin/v2/persistent/" + namespace + "/" + 
TopicName.get(topic).getLocalName()
+                + "/maxConsumers";
+        admin.topics().createNonPartitionedTopic(topic);
+        Producer producer = pulsarClient.newProducer().topic(topic).create();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+
+        // Set non global policy.
+        // Verify: it affects.
+        if ("PULSAR_ADMIN".equals(clientRequestType)) {
+            admin.topicPolicies(false).setMaxConsumers(topic, 10);
+        } else {
+            Response res = httpClient.target(hostAndPort).path(httpPath)
+                    .queryParam("isGlobal", "false")
+                    .request()
+                    .header("Content-Type", "application/json")
+                    .post(Entity.json(10));
+            assertTrue(res.getStatus() == 200 || res.getStatus() == 204);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
false).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+        });
+
+        // Set global policy.
+        // Verify: topic policies has higher priority than global policies.
+        if ("PULSAR_ADMIN".equals(clientRequestType)) {
+            admin.topicPolicies(true).setMaxConsumers(topic, 20);
+        } else {
+            Response globalRes = httpClient.target(hostAndPort).path(httpPath)
+                    .queryParam("isGlobal", "true")
+                    .request()
+                    .header("Content-Type", "application/json")
+                    .post(Entity.json(20));
+            assertTrue(globalRes.getStatus() == 200 || globalRes.getStatus() 
== 204);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
false).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
true).join().get()
+                    .getMaxConsumerPerTopic(), 20);
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+        });
+
+        // Remove non-global policy.
+        // Verify: global policy affects.
+        if ("PULSAR_ADMIN".equals(clientRequestType)) {
+            admin.topicPolicies(false).removeMaxConsumers(topic);
+        } else {
+            Response removeRes = httpClient.target(hostAndPort).path(httpPath)
+                    .queryParam("isGlobal", "false")
+                    .request()
+                    .header("Content-Type", "application/json")
+                    .delete();
+            assertTrue(removeRes.getStatus() == 200 || removeRes.getStatus() 
== 204);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
true).join().get()
+                    .getMaxConsumerPerTopic(), 20);
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 20);
+        });
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
+
+    @Test(dataProvider = "clientRequestType")
+    public void testPriorityOfGlobalPolicies2(String clientRequestType) throws 
Exception {
+        final SystemTopicBasedTopicPoliciesService topicPoliciesService
+                = (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+        final JerseyClient httpClient = JerseyClientBuilder.createClient();
+        // create topic and load it up.
+        final String namespace = myNamespace;
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final TopicName topicName = TopicName.get(topic);
+        final String hostAndPort = pulsar.getWebServiceAddress();
+        final String httpPath = "/admin/v2/persistent/" + namespace + "/" + 
TopicName.get(topic).getLocalName()
+                + "/maxConsumers";
+        admin.topics().createNonPartitionedTopic(topic);
+        Producer producer = pulsarClient.newProducer().topic(topic).create();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopics().get(topic).get().get();
+
+        // Set global policy.
+        // Verify: it affects.
+        if ("PULSAR_ADMIN".equals(clientRequestType)) {
+            admin.topicPolicies(true).setMaxConsumers(topic, 20);
+        } else {
+            Response globalRes = httpClient.target(hostAndPort).path(httpPath)
+                    .queryParam("isGlobal", "true")
+                    .request()
+                    .header("Content-Type", "application/json")
+                    .post(Entity.json(20));
+            assertTrue(globalRes.getStatus() == 200 || globalRes.getStatus() 
== 204);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
true).join().get()
+                    .getMaxConsumerPerTopic(), 20);
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 20);
+        });
+
+        // Set non global policy.
+        // Verify: topic policies has higher priority than global policies.
+        if ("PULSAR_ADMIN".equals(clientRequestType)) {
+            admin.topicPolicies(false).setMaxConsumers(topic, 10);
+        } else {
+            Response res = httpClient.target(hostAndPort).path(httpPath)
+                    .queryParam("isGlobal", "false")
+                    .request()
+                    .header("Content-Type", "application/json")
+                    .post(Entity.json(10));
+            assertTrue(res.getStatus() == 200 || res.getStatus() == 204);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
false).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
true).join().get()
+                    .getMaxConsumerPerTopic(), 20);
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+        });
+
+        // Remove global policy.
+        // Verify: non-global policy affects.
+        if ("PULSAR_ADMIN".equals(clientRequestType)) {
+            admin.topicPolicies(true).removeMaxConsumers(topic);
+        } else {
+            Response removeRes = httpClient.target(hostAndPort).path(httpPath)
+                    .queryParam("isGlobal", "true")
+                    .request()
+                    .header("Content-Type", "application/json")
+                    .delete();
+            assertTrue(removeRes.getStatus() == 200 || removeRes.getStatus() 
== 204);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(topicPoliciesService.getTopicPoliciesAsync(topicName, 
false).join().get()
+                    .getMaxConsumerPerTopic(), 10);
+            HierarchyTopicPolicies hierarchyTopicPolicies = 
persistentTopic.getHierarchyTopicPolicies();
+            
assertEquals(hierarchyTopicPolicies.getMaxConsumerPerTopic().get(), 10);
+        });
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
+
     @Test(timeOut = 20000)
     public void testGetSizeBasedBacklogQuotaApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java
index a23ef9c8780..d416f012b7f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValue.java
@@ -38,6 +38,9 @@ public class PolicyHierarchyValue<T> {
     @Getter
     private volatile T topicValue;
 
+    @Getter
+    private volatile T topicGlobalValue;
+
     private volatile T value;
 
     public PolicyHierarchyValue() {
@@ -53,8 +56,17 @@ public class PolicyHierarchyValue<T> {
         updateValue();
     }
 
+    @Deprecated
     public void updateTopicValue(T topicValue) {
-        this.topicValue = topicValue;
+        updateTopicValue(topicValue, false);
+    }
+
+    public void updateTopicValue(T topicValue, boolean isGlobalPolicy) {
+        if (isGlobalPolicy) {
+            this.topicGlobalValue = topicValue;
+        } else {
+            this.topicValue = topicValue;
+        }
         updateValue();
     }
 
@@ -62,6 +74,8 @@ public class PolicyHierarchyValue<T> {
         VALUE_UPDATER.updateAndGet(this, (preValue) -> {
             if (topicValue != null) {
                 return topicValue;
+            } else if (topicGlobalValue != null) {
+                return topicGlobalValue;
             } else if (namespaceValue != null) {
                 return namespaceValue;
             } else {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java
index 2d6e803050c..7a2298ddc23 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PolicyHierarchyValueTest.java
@@ -33,13 +33,13 @@ public class PolicyHierarchyValueTest {
         value.updateNamespaceValue(2);
         Assert.assertEquals(value.get(), Integer.valueOf(2));
 
-        value.updateTopicValue(3);
+        value.updateTopicValue(3, false);
         Assert.assertEquals(value.get(), Integer.valueOf(3));
 
         value.updateNamespaceValue(null);
         Assert.assertEquals(value.get(), Integer.valueOf(3));
 
-        value.updateTopicValue(null);
+        value.updateTopicValue(null, false);
         Assert.assertEquals(value.get(), Integer.valueOf(1));
     }
 }
\ No newline at end of file

Reply via email to