This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new b3ad3beb CASSANALYTICS-99: Fixed bulk reader node availability
comparator ordering (#151)
b3ad3beb is described below
commit b3ad3bebe0aea654901d69ff3720da9f728271d0
Author: Sudipta <[email protected]>
AuthorDate: Thu Oct 30 11:05:06 2025 -0700
CASSANALYTICS-99: Fixed bulk reader node availability comparator ordering
(#151)
Patch by Sudipta Laha; reviewed by Yifan Cai, Saranya Krishnakumar,
Francisco Guerrero for CASSANALYTICS-99
---
CHANGES.txt | 1 +
.../cassandra/spark/data/PartitionedDataLayer.java | 2 +-
.../spark/data/PartitionedDataLayerTests.java | 51 +++++++++++++++++++---
3 files changed, 48 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 5463af35..96b619ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Fix bulk reader node availability comparator ordering (CASSANALYTICS-99)
* Remove not needed buffer flips (CASSANALYTICS-95)
* Refactor BulkWriterContext broadcasting to use immutable config class
(CASSANALYTICS-89)
* Bump sidecar dependency to 0.2.0 (CASSANALYTICS-93)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
index fd0de1f8..d3200166 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
@@ -86,7 +86,7 @@ public abstract class PartitionedDataLayer extends DataLayer
}
public static final Comparator<AvailabilityHint>
AVAILABILITY_HINT_COMPARATOR =
- Comparator.comparingInt((AvailabilityHint other) ->
other.priority).reversed();
+ Comparator.comparingInt((AvailabilityHint other) ->
other.priority);
public static AvailabilityHint fromState(String status, String state)
{
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
index de3ad5d5..b1169963 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
@@ -154,14 +154,14 @@ public class PartitionedDataLayerTests extends
VersionRunner
@Test
public void testAvailabilityHintComparator()
{
- assertThat(AVAILABILITY_HINT_COMPARATOR.compare(UP,
MOVING)).isEqualTo(1);
+ assertThat(AVAILABILITY_HINT_COMPARATOR.compare(UP,
MOVING)).isEqualTo(-1);
assertThat(AVAILABILITY_HINT_COMPARATOR.compare(LEAVING,
MOVING)).isEqualTo(0);
- assertThat(AVAILABILITY_HINT_COMPARATOR.compare(UNKNOWN,
MOVING)).isEqualTo(-1);
- assertThat(AVAILABILITY_HINT_COMPARATOR.compare(LEAVING,
UNKNOWN)).isEqualTo(1);
+ assertThat(AVAILABILITY_HINT_COMPARATOR.compare(UNKNOWN,
MOVING)).isEqualTo(1);
+ assertThat(AVAILABILITY_HINT_COMPARATOR.compare(LEAVING,
UNKNOWN)).isEqualTo(-1);
assertThat(AVAILABILITY_HINT_COMPARATOR.compare(DOWN,
UNKNOWN)).isEqualTo(0);
assertThat(AVAILABILITY_HINT_COMPARATOR.compare(JOINING,
DOWN)).isEqualTo(0);
- assertThat(AVAILABILITY_HINT_COMPARATOR.compare(UP,
DOWN)).isEqualTo(1);
- assertThat(AVAILABILITY_HINT_COMPARATOR.compare(JOINING,
UP)).isEqualTo(-1);
+ assertThat(AVAILABILITY_HINT_COMPARATOR.compare(UP,
DOWN)).isEqualTo(-1);
+ assertThat(AVAILABILITY_HINT_COMPARATOR.compare(JOINING,
UP)).isEqualTo(1);
}
@Test
@@ -392,4 +392,45 @@ public class PartitionedDataLayerTests extends
VersionRunner
assertThat(replicaSet.primary().size() +
replicaSet.backup().size()).isEqualTo(replicas.size());
}
}
+
+
+ /**
+ * Tests that the AvailabilityHint comparator correctly orders Cassandra
nodes by availability priority:
+ * UP nodes first, then MOVING/LEAVING nodes, and finally
DOWN/UNKNOWN/JOINING nodes last.
+ */
+ @Test
+ public void testSortingByAvailabilityHintComparator()
+ {
+ List<PartitionedDataLayer.AvailabilityHint> hints = Arrays.asList(UP,
MOVING, LEAVING, UNKNOWN, JOINING, DOWN);
+
+ for (int i = 0; i < 5; i++)
+ {
+ validateHintsSequence(hints, 1, 3);
+ }
+
+ hints = Arrays.asList(UP, UP, UP, MOVING, MOVING, LEAVING, UNKNOWN,
UNKNOWN, UNKNOWN, JOINING, DOWN, DOWN, DOWN, DOWN, DOWN, DOWN);
+
+ for (int i = 0; i < 5; i++)
+ {
+ validateHintsSequence(hints, 3, 6);
+ }
+ }
+
+ private static void
validateHintsSequence(List<PartitionedDataLayer.AvailabilityHint> hints, int
upCount, int movingOrLeavingCount)
+ {
+ List<PartitionedDataLayer.AvailabilityHint> shuffledHints = new
ArrayList<>(hints);
+ Collections.shuffle(shuffledHints);
+ // Test expected ordering: UP > MOVING/LEAVING > UNKNOWN/JOINING/DOWN
+ List<PartitionedDataLayer.AvailabilityHint> sorted = new
ArrayList<>(shuffledHints);
+ sorted.sort(AVAILABILITY_HINT_COMPARATOR);
+
+ // Verify UP comes first (highest priority)
+ assertThat(sorted.subList(0,
upCount)).contains(UP).doesNotContain(MOVING, LEAVING, UNKNOWN, JOINING, DOWN);
+
+ // Verify MOVING, LEAVING are in the middle
+ assertThat(sorted.subList(upCount,
movingOrLeavingCount)).contains(MOVING, LEAVING).doesNotContain(UP, DOWN,
UNKNOWN, JOINING);
+
+ // Verify DOWN, UNKNOWN, JOINING come last (lowest priority)
+ assertThat(sorted.subList(movingOrLeavingCount,
sorted.size())).contains(DOWN, UNKNOWN, JOINING).doesNotContain(UP, MOVING,
LEAVING);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]