This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new d0722a95a87 [fix][misc] Fix compareTo contract violation for
NamespaceBundleStats, TimeAverageMessageData and ResourceUnitRanking (#24772)
d0722a95a87 is described below
commit d0722a95a879b52a68713a2b1ea3367402f15c63
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Sep 23 01:48:44 2025 +0300
[fix][misc] Fix compareTo contract violation for NamespaceBundleStats,
TimeAverageMessageData and ResourceUnitRanking (#24772)
---
.../loadbalance/impl/SimpleLoadManagerImpl.java | 2 +-
.../extensions/models/TopKBundlesTest.java | 25 +++++++
.../data/loadbalancer/NamespaceBundleStats.java | 50 ++++++-------
.../loadbalancer/NamespaceBundleStatsTest.java | 48 ++++++++++++
.../org/apache/pulsar/common/util/CompareUtil.java | 61 +++++++++++++++
.../data/loadbalancer/ResourceUnitRanking.java | 10 ++-
.../data/loadbalancer/TimeAverageMessageData.java | 24 ++----
.../apache/pulsar/common/util/CompareUtilTest.java | 87 ++++++++++++++++++++++
.../loadbalancer/TimeAverageMessageDataTest.java | 44 +++++++++++
9 files changed, 304 insertions(+), 47 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 30a7359ce0e..b577e0f6280 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -813,7 +813,7 @@ public class SimpleLoadManagerImpl implements LoadManager,
Consumer<Notification
minLoadPercentage = loadPercentage;
} else {
if ((unboundedRanks ?
ranking.compareMessageRateTo(selectedRanking)
- : ranking.compareTo(selectedRanking)) < 0) {
+ : ranking.compareToOtherRanking(selectedRanking))
< 0) {
minLoadPercentage = loadPercentage;
selectedRU = candidate;
selectedRanking = ranking;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
index 42c15165579..2be3108c638 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
@@ -319,4 +319,29 @@ public class TopKBundlesTest {
}
}
}
+
+ // Issue https://github.com/apache/pulsar/issues/24754
+ @Test
+ public void testPartitionSortCompareToContractViolationIssue() {
+ Random rnd = new Random(0);
+ ArrayList<NamespaceBundleStats> stats = new ArrayList<>();
+ for (int i = 0; i < 1000; ++i) {
+ NamespaceBundleStats s = new NamespaceBundleStats();
+ s.msgThroughputIn = 4 * 75000 * rnd.nextDouble(); // Just above
threshold (1e5)
+ s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
+ s.msgRateIn = 4 * 75 * rnd.nextDouble();
+ s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
+ s.topics = i;
+ s.consumerCount = i;
+ s.producerCount = 4 * rnd.nextInt(375);
+ s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
+ stats.add(s);
+ }
+ List<Map.Entry<String, ? extends Comparable>> bundleEntries = new
ArrayList<>();
+
+ for (NamespaceBundleStats s : stats) {
+ bundleEntries.add(Map.entry("bundle-" + s.msgThroughputIn, s));
+ }
+ TopKBundles.partitionSort(bundleEntries, 100);
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
index cd364990842..d3c894524bf 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java
@@ -37,14 +37,14 @@ public class NamespaceBundleStats implements
Comparable<NamespaceBundleStats>, S
public long topics;
public long cacheSize;
- // Consider the throughput equal if difference is less than 100 KB/s
- private static final double throughputDifferenceThreshold = 1e5;
- // Consider the msgRate equal if the difference is less than 100
- private static final double msgRateDifferenceThreshold = 100;
- // Consider the total topics/producers/consumers equal if the difference
is less than 500
- private static final long topicConnectionDifferenceThreshold = 500;
- // Consider the cache size equal if the difference is less than 100 kb
- private static final long cacheSizeDifferenceThreshold = 100000;
+ // When comparing throughput, uses a resolution of 100 KB/s, effectively
rounding values before comparison
+ private static final double throughputComparisonResolution = 1e5;
+ // When comparing message rate, uses a resolution of 100, effectively
rounding values before comparison
+ private static final double msgRateComparisonResolution = 100;
+ // When comparing total topics/producers/consumers, uses a
resolution/rounding of 500
+ private static final long topicConnectionComparisonResolution = 500;
+ // When comparing cache size, uses a resolution/rounding of 100kB
+ private static final long cacheSizeComparisonResolution = 100000;
public NamespaceBundleStats() {
reset();
@@ -89,39 +89,33 @@ public class NamespaceBundleStats implements
Comparable<NamespaceBundleStats>, S
public int compareByMsgRate(NamespaceBundleStats other) {
double thisMsgRate = this.msgRateIn + this.msgRateOut;
double otherMsgRate = other.msgRateIn + other.msgRateOut;
- if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold)
{
- return Double.compare(thisMsgRate, otherMsgRate);
- }
- return 0;
+ return compareDoubleWithResolution(thisMsgRate, otherMsgRate,
msgRateComparisonResolution);
+ }
+
+ private static int compareDoubleWithResolution(double v1, double v2,
double resolution) {
+ return Long.compare(Math.round(v1 / resolution), Math.round(v2 /
resolution));
+ }
+
+ private static int compareLongWithResolution(long v1, long v2, long
resolution) {
+ return Long.compare(v1 / resolution, v2 / resolution);
}
public int compareByTopicConnections(NamespaceBundleStats other) {
long thisTopicsAndConnections = this.topics + this.consumerCount +
this.producerCount;
long otherTopicsAndConnections = other.topics + other.consumerCount +
other.producerCount;
- if (Math.abs(thisTopicsAndConnections - otherTopicsAndConnections) >
topicConnectionDifferenceThreshold) {
- return Long.compare(thisTopicsAndConnections,
otherTopicsAndConnections);
- }
- return 0;
+ return compareLongWithResolution(thisTopicsAndConnections,
otherTopicsAndConnections,
+ topicConnectionComparisonResolution);
}
public int compareByCacheSize(NamespaceBundleStats other) {
- if (Math.abs(this.cacheSize - other.cacheSize) >
cacheSizeDifferenceThreshold) {
- return Long.compare(this.cacheSize, other.cacheSize);
- }
- return 0;
+ return compareLongWithResolution(cacheSize, other.cacheSize,
cacheSizeComparisonResolution);
}
public int compareByBandwidthIn(NamespaceBundleStats other) {
- if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) >
throughputDifferenceThreshold) {
- return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
- }
- return 0;
+ return compareDoubleWithResolution(msgThroughputIn,
other.msgThroughputIn, throughputComparisonResolution);
}
public int compareByBandwidthOut(NamespaceBundleStats other) {
- if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) >
throughputDifferenceThreshold) {
- return Double.compare(this.msgThroughputOut,
other.msgThroughputOut);
- }
- return 0;
+ return compareDoubleWithResolution(msgThroughputOut,
other.msgThroughputOut, throughputComparisonResolution);
}
}
diff --git
a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStatsTest.java
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStatsTest.java
new file mode 100644
index 00000000000..7b8d120e629
--- /dev/null
+++
b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStatsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.policies.data.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.testng.annotations.Test;
+
+public class NamespaceBundleStatsTest {
+
+ @Test
+ public void testCompareToContract() {
+ Random rnd = new Random();
+ List<NamespaceBundleStats> stats = new ArrayList<>();
+ for (int i = 0; i < 1000; ++i) {
+ NamespaceBundleStats s = new NamespaceBundleStats();
+ s.msgThroughputIn = 4 * 75000 * rnd.nextDouble();
+ s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
+ s.msgRateIn = 4 * 75 * rnd.nextDouble();
+ s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
+ s.topics = i;
+ s.consumerCount = i;
+ s.producerCount = 4 * rnd.nextInt(375);
+ s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
+ stats.add(s);
+ }
+ // this would throw "java.lang.IllegalArgumentException: Comparison
method violates its general contract!"
+ // if compareTo() is not implemented correctly.
+ stats.sort(NamespaceBundleStats::compareTo);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompareUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompareUtil.java
new file mode 100644
index 00000000000..b43c42ff1c2
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompareUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import lombok.experimental.UtilityClass;
+
+/**
+ * Utility class for comparing values.
+ */
+@UtilityClass
+public class CompareUtil {
+
+ /**
+ * Compare two double values with a given resolution.
+ * @param v1 first value to compare
+ * @param v2 second value to compare
+ * @param resolution resolution to compare with
+ * @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
+ */
+ public static int compareDoubleWithResolution(double v1, double v2, double
resolution) {
+ return Long.compare((long) (v1 / resolution), (long) (v2 /
resolution));
+ }
+
+ /**
+ * Compare two long values with a given resolution.
+ * @param v1 first value to compare
+ * @param v2 second value to compare
+ * @param resolution resolution to compare with
+ * @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
+ */
+ public static int compareLongWithResolution(long v1, long v2, long
resolution) {
+ return Long.compare(v1 / resolution, v2 / resolution);
+ }
+
+ /**
+ * Compare two int values with a given resolution.
+ * @param v1 first value to compare
+ * @param v2 second value to compare
+ * @param resolution resolution to compare with
+ * @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
+ */
+ public static int compareIntegerWithResolution(int v1, int v2, int
resolution) {
+ return Integer.compare(v1 / resolution, v2 / resolution);
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
index 3a9d7de09e1..9b83be08205 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
* The class containing information about system resources, allocated quota,
and loaded bundles.
*/
@EqualsAndHashCode
-public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
+public class ResourceUnitRanking {
private static final long KBITS_TO_BYTES = 1024 / 8;
private static final double PERCENTAGE_DIFFERENCE_THRESHOLD = 5.0;
@@ -129,7 +129,13 @@ public class ResourceUnitRanking implements
Comparable<ResourceUnitRanking> {
}
- public int compareTo(ResourceUnitRanking other) {
+ /**
+ * Compares to another ranking. Please note that this cannot be used to
sort the rankings since the results
+ * of this method don't satify the contract of {@link
Comparable#compareTo(Object)}
+ * @param other other ranking to compare to
+ * @return negative if this is less than other, 0 if they are equal,
positive if this is greater than other
+ */
+ public int compareToOtherRanking(ResourceUnitRanking other) {
if (Math.abs(this.estimatedLoadPercentage -
other.estimatedLoadPercentage) > PERCENTAGE_DIFFERENCE_THRESHOLD) {
return Double.compare(this.estimatedLoadPercentage,
other.estimatedLoadPercentage);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java
index b9c7a43c3a7..5264c5058fb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;
+import static
org.apache.pulsar.common.util.CompareUtil.compareDoubleWithResolution;
import lombok.EqualsAndHashCode;
/**
@@ -44,10 +45,10 @@ public class TimeAverageMessageData implements
Comparable<TimeAverageMessageData
// The average message rate out per second.
private double msgRateOut;
- // Consider the throughput equal if difference is less than 100 KB/s
- private static final double throughputDifferenceThreshold = 1e5;
- // Consider the msgRate equal if the difference is less than 100
- private static final double msgRateDifferenceThreshold = 100;
+ // When comparing throughput, uses a resolution of 100 KB/s, effectively
rounding values before comparison
+ private static final double throughputComparisonResolution = 1e5;
+ // When comparing message rate, uses a resolution of 100, effectively
rounding values before comparison
+ private static final double msgRateComparisonResolution = 100;
// For JSON only.
public TimeAverageMessageData() {
@@ -202,23 +203,14 @@ public class TimeAverageMessageData implements
Comparable<TimeAverageMessageData
public int compareByMsgRate(TimeAverageMessageData other) {
double thisMsgRate = this.msgRateIn + this.msgRateOut;
double otherMsgRate = other.msgRateIn + other.msgRateOut;
- if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold)
{
- return Double.compare(thisMsgRate, otherMsgRate);
- }
- return 0;
+ return compareDoubleWithResolution(thisMsgRate, otherMsgRate,
msgRateComparisonResolution);
}
public int compareByBandwidthIn(TimeAverageMessageData other) {
- if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) >
throughputDifferenceThreshold) {
- return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
- }
- return 0;
+ return compareDoubleWithResolution(msgThroughputIn,
other.msgThroughputIn, throughputComparisonResolution);
}
public int compareByBandwidthOut(TimeAverageMessageData other) {
- if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) >
throughputDifferenceThreshold) {
- return Double.compare(this.msgThroughputOut,
other.msgThroughputOut);
- }
- return 0;
+ return compareDoubleWithResolution(msgThroughputOut,
other.msgThroughputOut, throughputComparisonResolution);
}
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompareUtilTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompareUtilTest.java
new file mode 100644
index 00000000000..1c08ee4b2d0
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompareUtilTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+
+public class CompareUtilTest {
+ @Test
+ public void testCompareDoubleWithResolution_bucketedComparison() {
+ // Same truncated bucket when dividing by resolution
+ assertEquals(CompareUtil.compareDoubleWithResolution(0.01, 0.49, 0.5),
0);
+ assertEquals(CompareUtil.compareDoubleWithResolution(0.51, 0.99, 0.5),
0);
+
+ // Different truncated buckets
+ assertEquals(CompareUtil.compareDoubleWithResolution(0.51, 0.49, 0.5),
1);
+ assertEquals(CompareUtil.compareDoubleWithResolution(0.49, 0.51, 0.5),
-1);
+ assertEquals(CompareUtil.compareDoubleWithResolution(1.01, 0.49, 0.5),
1);
+
+ // Larger numbers
+ assertEquals(CompareUtil.compareDoubleWithResolution(19.99, 10.01,
10.0), 0);
+ assertEquals(CompareUtil.compareDoubleWithResolution(19.99, 20.01,
10.0), -1);
+ assertEquals(CompareUtil.compareDoubleWithResolution(10.00, 9.99,
10.0), 1);
+ }
+
+ @Test
+ public void testCompareLongWithResolution_exactComparison() {
+ // resolution = 1 -> behave like Long.compare
+ assertEquals(CompareUtil.compareLongWithResolution(1L, 2L, 1L), -1);
+ assertEquals(CompareUtil.compareLongWithResolution(2L, 1L, 1L), 1);
+ assertEquals(CompareUtil.compareLongWithResolution(2L, 2L, 1L), 0);
+ }
+
+ @Test
+ public void testCompareLongWithResolution_bucketedComparison() {
+ // Same bucket when divided by resolution
+ assertEquals(CompareUtil.compareLongWithResolution(8L, 9L, 10L), 0);
+ assertEquals(CompareUtil.compareLongWithResolution(10L, 19L, 10L), 0);
+
+ // Different buckets
+ assertEquals(CompareUtil.compareLongWithResolution(9L, 20L, 10L), -1);
+ assertEquals(CompareUtil.compareLongWithResolution(21L, 10L, 10L), 1);
+
+ // Larger resolution
+ assertEquals(CompareUtil.compareLongWithResolution(100L, 175L, 100L),
0);
+ assertEquals(CompareUtil.compareLongWithResolution(199L, 201L, 100L),
-1);
+ }
+
+ @Test
+ public void testCompareIntegerWithResolution_exactComparison() {
+ // resolution = 1 -> behave like Integer.compare
+ assertEquals(CompareUtil.compareIntegerWithResolution(1, 2, 1), -1);
+ assertEquals(CompareUtil.compareIntegerWithResolution(2, 1, 1), 1);
+ assertEquals(CompareUtil.compareIntegerWithResolution(2, 2, 1), 0);
+ }
+
+ @Test
+ public void testCompareIntegerWithResolution_bucketedComparison() {
+ // Same bucket
+ assertEquals(CompareUtil.compareIntegerWithResolution(3, 4, 5), 0);
+ assertEquals(CompareUtil.compareIntegerWithResolution(5, 9, 5), 0);
+
+ // Different buckets
+ assertEquals(CompareUtil.compareIntegerWithResolution(4, 10, 5), -1);
+ assertEquals(CompareUtil.compareIntegerWithResolution(11, 5, 5), 1);
+
+ // Larger resolution
+ assertEquals(CompareUtil.compareIntegerWithResolution(51, 75, 50), 0);
+ assertEquals(CompareUtil.compareIntegerWithResolution(49, 101, 50),
-1);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageDataTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageDataTest.java
new file mode 100644
index 00000000000..ff7f3083014
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageDataTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.policies.data.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.testng.annotations.Test;
+
+public class TimeAverageMessageDataTest {
+ @Test
+ public void testCompareToContract() {
+ Random rnd = new Random();
+ List<TimeAverageMessageData> list = new ArrayList<>();
+ for (int i = 0; i < 1000; ++i) {
+ TimeAverageMessageData data = new TimeAverageMessageData(1);
+ double msgThroughputIn = 4 * 75000 * rnd.nextDouble();
+ double msgThroughputOut = 75000000 - (4 * (75000 *
rnd.nextDouble()));
+ double msgRateIn = 4 * 75 * rnd.nextDouble();
+ double msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
+ data.update(msgThroughputIn, msgThroughputOut, msgRateIn,
msgRateOut);
+ list.add(data);
+ }
+ // this would throw "java.lang.IllegalArgumentException: Comparison
method violates its general contract!"
+ // if compareTo() is not implemented correctly.
+ list.sort(TimeAverageMessageData::compareTo);
+ }
+}
\ No newline at end of file