This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4fd61ab585a [improve][broker] Optimize and clean up aggregation of 
topic stats (#21361)
4fd61ab585a is described below

commit 4fd61ab585a93372ea62cf6f417d0232ba9e9343
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Oct 14 13:31:27 2023 +0300

    [improve][broker] Optimize and clean up aggregation of topic stats (#21361)
    
    (cherry picked from commit d6a56ad3094e9da89fa5f7cf00e0b7bbb36e6563)
---
 .../data/stats/NonPersistentTopicStatsImpl.java    | 57 +++++++---------------
 .../common/policies/data/stats/TopicStatsImpl.java | 49 +++++++------------
 2 files changed, 35 insertions(+), 71 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index f2905486adb..f3107f77442 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -153,8 +153,9 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
         Objects.requireNonNull(stats);
         super.add(stats);
         this.msgDropRate += stats.msgDropRate;
-        for (int index = 0; index < stats.getNonPersistentPublishers().size(); 
index++) {
-            NonPersistentPublisherStats s = 
stats.getNonPersistentPublishers().get(index);
+        List<NonPersistentPublisherStats> publisherStats = 
stats.getNonPersistentPublishers();
+        for (int index = 0; index < publisherStats.size(); index++) {
+            NonPersistentPublisherStats s = publisherStats.get(index);
             if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                 ((NonPersistentPublisherStatsImpl) 
this.nonPersistentPublishersMap
                         .computeIfAbsent(s.getProducerName(), key -> {
@@ -179,46 +180,24 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
             }
         }
 
-        if (this.getNonPersistentSubscriptions().size() != 
stats.getNonPersistentSubscriptions().size()) {
-            for (String subscription : 
stats.getNonPersistentSubscriptions().keySet()) {
-                NonPersistentSubscriptionStatsImpl subscriptionStats = new 
NonPersistentSubscriptionStatsImpl();
-                this.getNonPersistentSubscriptions().put(subscription, 
subscriptionStats
-                        .add((NonPersistentSubscriptionStatsImpl)
-                                
stats.getNonPersistentSubscriptions().get(subscription)));
-            }
-        } else {
-            for (String subscription : 
stats.getNonPersistentSubscriptions().keySet()) {
-                if (this.getNonPersistentSubscriptions().get(subscription) != 
null) {
-                    ((NonPersistentSubscriptionStatsImpl) 
this.getNonPersistentSubscriptions().get(subscription))
-                          .add((NonPersistentSubscriptionStatsImpl)
-                                  
stats.getNonPersistentSubscriptions().get(subscription));
-                } else {
-                    NonPersistentSubscriptionStatsImpl subscriptionStats = new 
NonPersistentSubscriptionStatsImpl();
-                    this.getNonPersistentSubscriptions().put(subscription, 
subscriptionStats
-                         .add((NonPersistentSubscriptionStatsImpl)
-                                 
stats.getNonPersistentSubscriptions().get(subscription)));
-                }
-            }
+        for (Map.Entry<String, NonPersistentSubscriptionStats> entry : 
stats.getNonPersistentSubscriptions()
+                .entrySet()) {
+            NonPersistentSubscriptionStatsImpl subscriptionStats =
+                    (NonPersistentSubscriptionStatsImpl) 
this.getNonPersistentSubscriptions()
+                            .computeIfAbsent(entry.getKey(), k -> new 
NonPersistentSubscriptionStatsImpl());
+            subscriptionStats.add(
+                    (NonPersistentSubscriptionStatsImpl) entry.getValue());
         }
 
-        if (this.getNonPersistentReplicators().size() != 
stats.getNonPersistentReplicators().size()) {
-            for (String repl : stats.getNonPersistentReplicators().keySet()) {
-                NonPersistentReplicatorStatsImpl replStats = new 
NonPersistentReplicatorStatsImpl();
-                this.getNonPersistentReplicators().put(repl, replStats
-                        .add((NonPersistentReplicatorStatsImpl) 
stats.getNonPersistentReplicators().get(repl)));
-            }
-        } else {
-            for (String repl : stats.getNonPersistentReplicators().keySet()) {
-                if (this.getNonPersistentReplicators().get(repl) != null) {
-                    ((NonPersistentReplicatorStatsImpl) 
this.getNonPersistentReplicators().get(repl))
-                            .add((NonPersistentReplicatorStatsImpl) 
stats.getNonPersistentReplicators().get(repl));
-                } else {
-                    NonPersistentReplicatorStatsImpl replStats = new 
NonPersistentReplicatorStatsImpl();
-                    this.getNonPersistentReplicators().put(repl, replStats
-                            .add((NonPersistentReplicatorStatsImpl) 
stats.getNonPersistentReplicators().get(repl)));
-                }
-            }
+        for (Map.Entry<String, NonPersistentReplicatorStats> entry : 
stats.getNonPersistentReplicators().entrySet()) {
+            NonPersistentReplicatorStatsImpl replStats = 
(NonPersistentReplicatorStatsImpl)
+                    
this.getNonPersistentReplicators().computeIfAbsent(entry.getKey(), k -> {
+                        NonPersistentReplicatorStatsImpl r = new 
NonPersistentReplicatorStatsImpl();
+                        return r;
+                    });
+            replStats.add((NonPersistentReplicatorStatsImpl) entry.getValue());
         }
+
         return this;
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 0f2b6455157..dc4f154799b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -294,8 +294,9 @@ public class TopicStatsImpl implements TopicStats {
             topicMetricBean.value += v.value;
         });
 
-        for (int index = 0; index < stats.getPublishers().size(); index++) {
-           PublisherStats s = stats.getPublishers().get(index);
+        List<? extends PublisherStats> publisherStats = stats.getPublishers();
+        for (int index = 0; index < publisherStats.size(); index++) {
+           PublisherStats s = publisherStats.get(index);
            if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
                this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
                    final PublisherStatsImpl newStats = new 
PublisherStatsImpl();
@@ -319,38 +320,22 @@ public class TopicStatsImpl implements TopicStats {
            }
         }
 
-        if (this.subscriptions.size() != stats.subscriptions.size()) {
-            for (String subscription : stats.subscriptions.keySet()) {
-                SubscriptionStatsImpl subscriptionStats = new 
SubscriptionStatsImpl();
-                this.subscriptions.put(subscription, 
subscriptionStats.add(stats.subscriptions.get(subscription)));
-            }
-        } else {
-            for (String subscription : stats.subscriptions.keySet()) {
-                if (this.subscriptions.get(subscription) != null) {
-                    
this.subscriptions.get(subscription).add(stats.subscriptions.get(subscription));
-                } else {
-                    SubscriptionStatsImpl subscriptionStats = new 
SubscriptionStatsImpl();
-                    this.subscriptions.put(subscription, 
subscriptionStats.add(stats.subscriptions.get(subscription)));
-                }
-            }
+        for (Map.Entry<String, SubscriptionStatsImpl> entry : 
stats.subscriptions.entrySet()) {
+            SubscriptionStatsImpl subscriptionStats =
+                    this.subscriptions.computeIfAbsent(entry.getKey(), k -> 
new SubscriptionStatsImpl());
+            subscriptionStats.add(entry.getValue());
         }
-        if (this.replication.size() != stats.replication.size()) {
-            for (String repl : stats.replication.keySet()) {
-                ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
-                replStats.setConnected(true);
-                this.replication.put(repl, 
replStats.add(stats.replication.get(repl)));
-            }
-        } else {
-            for (String repl : stats.replication.keySet()) {
-                if (this.replication.get(repl) != null) {
-                    
this.replication.get(repl).add(stats.replication.get(repl));
-                } else {
-                    ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
-                    replStats.setConnected(true);
-                    this.replication.put(repl, 
replStats.add(stats.replication.get(repl)));
-                }
-            }
+
+        for (Map.Entry<String, ReplicatorStatsImpl> entry : 
stats.replication.entrySet()) {
+            ReplicatorStatsImpl replStats =
+                    this.replication.computeIfAbsent(entry.getKey(), k -> {
+                        ReplicatorStatsImpl r = new ReplicatorStatsImpl();
+                        r.setConnected(true);
+                        return r;
+                    });
+            replStats.add(entry.getValue());
         }
+
         if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl) 
ts).earliestMsgPublishTimeInBacklogs != 0) {
             earliestMsgPublishTimeInBacklogs = Math.min(
                     earliestMsgPublishTimeInBacklogs,

Reply via email to