This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/tmp-ec by this push:
new cc964191b9 better use of parentProgressPauseMs
cc964191b9 is described below
commit cc964191b9f0d678182533b3288a295ed613bd49
Author: Viraj Jasani <[email protected]>
AuthorDate: Mon Apr 6 17:09:40 2026 -0700
better use of parentProgressPauseMs
---
.../org/apache/phoenix/hbase/index/IndexCDCConsumer.java | 15 ++++++++++-----
.../ConcurrentMutationsCoveredEventualGenerateIT.java | 2 ++
.../phoenix/end2end/ConcurrentMutationsExtendedIT.java | 2 ++
.../end2end/ConcurrentMutationsExtendedIndexIT.java | 2 ++
.../ConcurrentMutationsUncoveredEventualGenerateIT.java | 2 ++
5 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 1ed4e5f26d..9eecbe2e0e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -808,11 +808,16 @@ public class IndexCDCConsumer implements Runnable {
}
long otherProgress = getParentProgress(partitionId);
if (otherProgress > currentLastProcessedTimestamp) {
- sleepIfNotStopped(parentProgressPauseMs);
- if (isPartitionCompleted(partitionId)) {
- return;
- }
- currentLastProcessedTimestamp = getParentProgress(partitionId);
+ long previousOtherProgress;
+ do {
+ previousOtherProgress = otherProgress;
+ sleepIfNotStopped(parentProgressPauseMs);
+ if (isPartitionCompleted(partitionId)) {
+ return;
+ }
+ otherProgress = getParentProgress(partitionId);
+ } while (!stopped && otherProgress > previousOtherProgress);
+ currentLastProcessedTimestamp = otherProgress;
}
}
long newTimestamp;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
index 1bf63c7579..0088a38a64 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.end2end;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
@@ -61,6 +62,7 @@ public class ConcurrentMutationsCoveredEventualGenerateIT
props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+ props.put(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS,
Integer.toString(1000));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index bda94a3c74..4e10284a32 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
import static org.junit.Assert.assertEquals;
@@ -100,6 +101,7 @@ public class ConcurrentMutationsExtendedIT extends
ParallelStatsDisabledIT {
props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(1000));
props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString());
+ props.put(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS,
Integer.toString(1000));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
index 54ad81e0f3..35fcbf1713 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
@@ -136,6 +136,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
splitThread.join(10000);
LOGGER.info(
"Total upsert time in ms : " +
(EnvironmentEdgeManager.currentTimeMillis() - startTime));
+ Thread.sleep(20000);
List<String> allIndexes =
new ArrayList<>(Arrays.asList(indexName1, indexName2, indexName3,
indexName4, indexName5));
verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
@@ -257,6 +258,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
assertTrue("Ran out of time", doneSignal.await(1500, TimeUnit.SECONDS));
LOGGER.info(
"Total upsert time in ms : " +
(EnvironmentEdgeManager.currentTimeMillis() - startTime));
+ Thread.sleep(20000);
List<String> allIndexes = new ArrayList<>(
Arrays.asList(indexName1, indexName2, indexName3, indexName4,
indexName5, indexName6));
verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
index 5ecfc1c036..24511b46db 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.end2end;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
import static
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
@@ -61,6 +62,7 @@ public class ConcurrentMutationsUncoveredEventualGenerateIT
props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+ props.put(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS,
Integer.toString(1000));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}