This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/tmp-ec by this push:
     new cc964191b9 better use of parentProgressPauseMs
cc964191b9 is described below

commit cc964191b9f0d678182533b3288a295ed613bd49
Author: Viraj Jasani <[email protected]>
AuthorDate: Mon Apr 6 17:09:40 2026 -0700

    better use of parentProgressPauseMs
---
 .../org/apache/phoenix/hbase/index/IndexCDCConsumer.java  | 15 ++++++++++-----
 .../ConcurrentMutationsCoveredEventualGenerateIT.java     |  2 ++
 .../phoenix/end2end/ConcurrentMutationsExtendedIT.java    |  2 ++
 .../end2end/ConcurrentMutationsExtendedIndexIT.java       |  2 ++
 .../ConcurrentMutationsUncoveredEventualGenerateIT.java   |  2 ++
 5 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 1ed4e5f26d..9eecbe2e0e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -808,11 +808,16 @@ public class IndexCDCConsumer implements Runnable {
           }
           long otherProgress = getParentProgress(partitionId);
           if (otherProgress > currentLastProcessedTimestamp) {
-            sleepIfNotStopped(parentProgressPauseMs);
-            if (isPartitionCompleted(partitionId)) {
-              return;
-            }
-            currentLastProcessedTimestamp = getParentProgress(partitionId);
+            long previousOtherProgress;
+            do {
+              previousOtherProgress = otherProgress;
+              sleepIfNotStopped(parentProgressPauseMs);
+              if (isPartitionCompleted(partitionId)) {
+                return;
+              }
+              otherProgress = getParentProgress(partitionId);
+            } while (!stopped && otherProgress > previousOtherProgress);
+            currentLastProcessedTimestamp = otherProgress;
           }
         }
         long newTimestamp;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
index 1bf63c7579..0088a38a64 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
 import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
@@ -61,6 +62,7 @@ public class ConcurrentMutationsCoveredEventualGenerateIT
     props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
     props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
     props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+    props.put(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, 
Integer.toString(1000));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index bda94a3c74..4e10284a32 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
 import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 import static org.junit.Assert.assertEquals;
@@ -100,6 +101,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
     props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(1000));
     props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
     props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString());
+    props.put(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, 
Integer.toString(1000));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
index 54ad81e0f3..35fcbf1713 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
@@ -136,6 +136,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     splitThread.join(10000);
     LOGGER.info(
       "Total upsert time in ms : " + 
(EnvironmentEdgeManager.currentTimeMillis() - startTime));
+    Thread.sleep(20000);
     List<String> allIndexes =
       new ArrayList<>(Arrays.asList(indexName1, indexName2, indexName3, 
indexName4, indexName5));
     verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
@@ -257,6 +258,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     assertTrue("Ran out of time", doneSignal.await(1500, TimeUnit.SECONDS));
     LOGGER.info(
       "Total upsert time in ms : " + 
(EnvironmentEdgeManager.currentTimeMillis() - startTime));
+    Thread.sleep(20000);
     List<String> allIndexes = new ArrayList<>(
       Arrays.asList(indexName1, indexName2, indexName3, indexName4, 
indexName5, indexName6));
     verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
index 5ecfc1c036..24511b46db 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
 import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
@@ -61,6 +62,7 @@ public class ConcurrentMutationsUncoveredEventualGenerateIT
     props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
     props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
     props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+    props.put(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, 
Integer.toString(1000));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 

Reply via email to