This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new 657ed2a42f PHOENIX-7797 Fixing flapper test
HAGroupStoreClientIT.testHAGroupStoreClientWithMultiThreadedUpdates (#2402)
657ed2a42f is described below
commit 657ed2a42fbe82ebd1ffd5e14efdc6aacb164223
Author: ritegarg <[email protected]>
AuthorDate: Fri Apr 10 12:46:24 2026 -0700
PHOENIX-7797 Fixing flapper test
HAGroupStoreClientIT.testHAGroupStoreClientWithMultiThreadedUpdates (#2402)
---
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 63 +++++++++++++---------
1 file changed, 39 insertions(+), 24 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index d5b1406c1a..234c67c8bd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -21,9 +21,11 @@ import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSIO
import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -32,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
@@ -393,67 +396,79 @@ public class HAGroupStoreClientIT extends BaseTest {
}
/**
- * This test verifies that the updates coming via PathChildrenCacheListener
are in order in which
- * updates are sent to ZK
+ * Verifies that events received via PathChildrenCacheListener preserve the
ordering of
+ * multi-threaded updates written to ZK. PathChildrenCache may coalesce
rapid updates to the same
+ * node, so fewer events than updates may arrive, but those received must be
in order.
*/
@Test
public void testHAGroupStoreClientWithMultiThreadedUpdates() throws
Exception {
// Number of threads to execute
int threadCount = 5;
- // Capture versions of crr in a list(crrEventVersions) in order they are
received.
- List<Integer> crrEventVersions = new ArrayList<>();
- CountDownLatch eventsLatch = new CountDownLatch(threadCount);
+ // Track received event versions and validate ordering inline as events
arrive.
+ List<Integer> crrEventVersions = Collections.synchronizedList(new
ArrayList<>());
+ AtomicInteger lastReceivedVersion = new AtomicInteger(0);
+ List<String> orderingErrors = Collections.synchronizedList(new
ArrayList<>());
+ CountDownLatch finalEventLatch = new CountDownLatch(1);
PathChildrenCacheListener pathChildrenCacheListener = (client, event) -> {
if (
event.getData() != null && event.getData().getData() != null
&& ClusterRoleRecord.fromJson(event.getData().getData()).isPresent()
) {
ClusterRoleRecord crr =
ClusterRoleRecord.fromJson(event.getData().getData()).get();
- crrEventVersions.add((int) crr.getVersion());
- eventsLatch.countDown();
+ int version = (int) crr.getVersion();
+ int prev = lastReceivedVersion.getAndSet(version);
+ if (version <= prev) {
+ orderingErrors.add("Event version " + version + " received after
version " + prev);
+ }
+ crrEventVersions.add(version);
+ if (version == threadCount) {
+ finalEventLatch.countDown();
+ }
}
};
// Start a new HAGroupStoreClient.
- new HAGroupStoreClient(config, pathChildrenCacheListener);
- // Create multiple threads for update to ZK.
+ HAGroupStoreClient storeClient = new HAGroupStoreClient(config,
pathChildrenCacheListener);
+
+ // Create multiple threads for updating ZK.
final CountDownLatch updateLatch = new CountDownLatch(threadCount);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
-
- // List captures the order of events that are sent.
- List<Integer> updateList = new ArrayList<>();
-
- // Create a queue which can be polled to send updates to ZK.
ConcurrentLinkedQueue<ClusterRoleRecord> updateQueue = new
ConcurrentLinkedQueue<>();
for (int i = 0; i < threadCount; i++) {
updateQueue.add(createCRR(i + 1));
- updateList.add(i + 1);
}
// Submit updates to ZK.
+ List<Exception> exceptions = Collections.synchronizedList(new
ArrayList<>());
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
synchronized (HAGroupStoreClientIT.class) {
haAdmin.createOrUpdateDataOnZookeeper(Objects.requireNonNull(updateQueue.poll()));
}
- updateLatch.countDown();
} catch (Exception e) {
- throw new RuntimeException(e);
+ exceptions.add(e);
+ } finally {
+ updateLatch.countDown();
}
});
}
- // Check if updates are sent and updates are received.
- assert eventsLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS *
threadCount,
- TimeUnit.MILLISECONDS);
- assert updateLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS *
threadCount,
- TimeUnit.MILLISECONDS);
+ // Wait for all updates to complete and the final event to be received.
+ assertTrue("Update Latch value is " + updateLatch.getCount(), updateLatch
+ .await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS * threadCount,
TimeUnit.MILLISECONDS));
+ assertTrue("Unexpected exceptions in update threads: " + exceptions,
exceptions.isEmpty());
+ assertTrue("Final event (version " + threadCount + ") was not received",
finalEventLatch
+ .await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS * threadCount,
TimeUnit.MILLISECONDS));
+
+ // Verify events were received in strictly ascending version order.
+ assertTrue("Events received out of order: " + orderingErrors + ", versions
received: "
+ + crrEventVersions, orderingErrors.isEmpty());
- // Assert that the order of updates is same as order of events.
- assert updateList.equals(crrEventVersions);
+ executor.shutdown();
+ storeClient.close();
}
private ClusterRoleRecord createCRR(Integer version) {