This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new f088fdeb28 PHOENIX-7797 Fixing flapper test 
HAGroupStoreClientIT.testHAGroupStoreClientWithMultiThreadedUpdates (#2402)
f088fdeb28 is described below

commit f088fdeb28e1dea7dd156b5090f44439089ad074
Author: ritegarg <[email protected]>
AuthorDate: Fri Apr 10 12:46:24 2026 -0700

    PHOENIX-7797 Fixing flapper test 
HAGroupStoreClientIT.testHAGroupStoreClientWithMultiThreadedUpdates (#2402)
---
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  | 63 +++++++++++++---------
 1 file changed, 39 insertions(+), 24 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index d5b1406c1a..234c67c8bd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -21,9 +21,11 @@ import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSIO
 import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -32,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
@@ -393,67 +396,79 @@ public class HAGroupStoreClientIT extends BaseTest {
   }
 
   /**
-   * This test verifies that the updates coming via PathChildrenCacheListener 
are in order in which
-   * updates are sent to ZK
+   * Verifies that events received via PathChildrenCacheListener preserve the 
ordering of
+   * multi-threaded updates written to ZK. PathChildrenCache may coalesce 
rapid updates to the same
+   * node, so fewer events than updates may arrive, but those received must be 
in order.
    */
   @Test
   public void testHAGroupStoreClientWithMultiThreadedUpdates() throws 
Exception {
     // Number of threads to execute
     int threadCount = 5;
 
-    // Capture versions of crr in a list(crrEventVersions) in order they are 
received.
-    List<Integer> crrEventVersions = new ArrayList<>();
-    CountDownLatch eventsLatch = new CountDownLatch(threadCount);
+    // Track received event versions and validate ordering inline as events 
arrive.
+    List<Integer> crrEventVersions = Collections.synchronizedList(new 
ArrayList<>());
+    AtomicInteger lastReceivedVersion = new AtomicInteger(0);
+    List<String> orderingErrors = Collections.synchronizedList(new 
ArrayList<>());
+    CountDownLatch finalEventLatch = new CountDownLatch(1);
     PathChildrenCacheListener pathChildrenCacheListener = (client, event) -> {
       if (
         event.getData() != null && event.getData().getData() != null
           && ClusterRoleRecord.fromJson(event.getData().getData()).isPresent()
       ) {
         ClusterRoleRecord crr = 
ClusterRoleRecord.fromJson(event.getData().getData()).get();
-        crrEventVersions.add((int) crr.getVersion());
-        eventsLatch.countDown();
+        int version = (int) crr.getVersion();
+        int prev = lastReceivedVersion.getAndSet(version);
+        if (version <= prev) {
+          orderingErrors.add("Event version " + version + " received after 
version " + prev);
+        }
+        crrEventVersions.add(version);
+        if (version == threadCount) {
+          finalEventLatch.countDown();
+        }
       }
     };
 
     // Start a new HAGroupStoreClient.
-    new HAGroupStoreClient(config, pathChildrenCacheListener);
 
-    // Create multiple threads for update to ZK.
+    HAGroupStoreClient storeClient = new HAGroupStoreClient(config, 
pathChildrenCacheListener);
+
+    // Create multiple threads for updating ZK.
     final CountDownLatch updateLatch = new CountDownLatch(threadCount);
     ExecutorService executor = Executors.newFixedThreadPool(threadCount);
-
-    // List captures the order of events that are sent.
-    List<Integer> updateList = new ArrayList<>();
-
-    // Create a queue which can be polled to send updates to ZK.
     ConcurrentLinkedQueue<ClusterRoleRecord> updateQueue = new 
ConcurrentLinkedQueue<>();
     for (int i = 0; i < threadCount; i++) {
       updateQueue.add(createCRR(i + 1));
-      updateList.add(i + 1);
     }
 
     // Submit updates to ZK.
+    List<Exception> exceptions = Collections.synchronizedList(new 
ArrayList<>());
     for (int i = 0; i < threadCount; i++) {
       executor.submit(() -> {
         try {
           synchronized (HAGroupStoreClientIT.class) {
             
haAdmin.createOrUpdateDataOnZookeeper(Objects.requireNonNull(updateQueue.poll()));
           }
-          updateLatch.countDown();
         } catch (Exception e) {
-          throw new RuntimeException(e);
+          exceptions.add(e);
+        } finally {
+          updateLatch.countDown();
         }
       });
     }
 
-    // Check if updates are sent and updates are received.
-    assert eventsLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS * 
threadCount,
-      TimeUnit.MILLISECONDS);
-    assert updateLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS * 
threadCount,
-      TimeUnit.MILLISECONDS);
+    // Wait for all updates to complete and the final event to be received.
+    assertTrue("Update Latch value is " + updateLatch.getCount(), updateLatch
+      .await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS * threadCount, 
TimeUnit.MILLISECONDS));
+    assertTrue("Unexpected exceptions in update threads: " + exceptions, 
exceptions.isEmpty());
+    assertTrue("Final event (version " + threadCount + ") was not received", 
finalEventLatch
+      .await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS * threadCount, 
TimeUnit.MILLISECONDS));
+
+    // Verify events were received in strictly ascending version order.
+    assertTrue("Events received out of order: " + orderingErrors + ", versions 
received: "
+      + crrEventVersions, orderingErrors.isEmpty());
 
-    // Assert that the order of updates is same as order of events.
-    assert updateList.equals(crrEventVersions);
+    executor.shutdown();
+    storeClient.close();
   }
 
   private ClusterRoleRecord createCRR(Integer version) {

Reply via email to