This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new 8a7a3a6c29 PHOENIX-7884 : Refactor tracking of IndexCdcConsumer lag
(#2506) (#2542)
8a7a3a6c29 is described below
commit 8a7a3a6c29a4bf1de5f90ef8fbf9d94cf131ee1d
Author: Palash Chauhan <[email protected]>
AuthorDate: Tue Jun 23 21:54:06 2026 -0700
PHOENIX-7884 : Refactor tracking of IndexCdcConsumer lag (#2506) (#2542)
---
.../metrics/MetricsIndexCDCConsumerSource.java | 6 +-
.../phoenix/hbase/index/IndexCDCConsumer.java | 117 ++++++++++++-----
.../hbase/index/IndexCDCConsumerProgress.java | 92 +++++++++++++
phoenix-core/pom.xml | 5 +
.../phoenix/end2end/IndexCDCConsumerLagIT.java | 146 +++++++++++++++++++++
.../hbase/index/IndexCDCConsumerProgressTest.java | 121 +++++++++++++++++
6 files changed, 455 insertions(+), 32 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
index c278c1fa4e..cb39b5b38a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
@@ -52,7 +52,11 @@ public interface MetricsIndexCDCConsumerSource extends
BaseSource {
String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag";
String CDC_INDEX_UPDATE_LAG_DESC =
- "Histogram for the lag in milliseconds between current time and the last
processed CDC event";
+ "Histogram of current time minus the consumer's effective freshness
watermark, in "
+ + "milliseconds. The watermark advances on successful own-partition
batches AND on empty "
+ + "polls (which prove caught-up to queryStart - timestampBufferMs). Idle
steady state is "
+ + "≈ timestampBufferMs; grows during sustained failure, parent-region
replay, or cold "
+ + "start (where it is floored at now - consumerStartTime).";
/**
* Updates the CDC batch processing time histogram.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 6c4ceab789..412e380b66 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -112,7 +112,7 @@ public class IndexCDCConsumer implements Runnable {
*/
public static final String INDEX_CDC_CONSUMER_POLL_INTERVAL_MS =
"phoenix.index.cdc.consumer.poll.interval.ms";
- private static final long DEFAULT_POLL_INTERVAL_MS = 1000;
+ private static final long DEFAULT_POLL_INTERVAL_MS = 500;
/**
* The time buffer in milliseconds subtracted from current time when
querying CDC mutations to
@@ -130,16 +130,25 @@ public class IndexCDCConsumer implements Runnable {
*/
public static final String INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES =
"phoenix.index.cdc.consumer.max.data.visibility.retries";
- private static final int DEFAULT_MAX_DATA_VISIBILITY_RETRIES = 10;
+ private static final int DEFAULT_MAX_DATA_VISIBILITY_RETRIES = 15;
public static final String INDEX_CDC_CONSUMER_RETRY_PAUSE_MS =
"phoenix.index.cdc.consumer.retry.pause.ms";
- private static final long DEFAULT_RETRY_PAUSE_MS = 2000;
+ private static final long DEFAULT_RETRY_PAUSE_MS = 200;
public static final String INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS =
"phoenix.index.cdc.consumer.parent.progress.pause.ms";
private static final long DEFAULT_PARENT_PROGRESS_PAUSE_MS = 15000;
+ /**
+ * Interval between {@code cdcIndexUpdateLag} samples emitted while the
consumer is sleeping (idle
+ * poll, backoff, parent-progress wait, etc.). Clamped to at least 50ms.
+ */
+ public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS =
+ "phoenix.index.cdc.consumer.lag.sample.interval.ms";
+ private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L;
+ private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L;
+
private final RegionCoprocessorEnvironment env;
private final String dataTableName;
private final String encodedRegionName;
@@ -154,6 +163,12 @@ public class IndexCDCConsumer implements Runnable {
private final Configuration config;
private final boolean serializeCDCMutations;
private final MetricsIndexCDCConsumerSource metricSource;
+ private final long lagSampleIntervalMs;
+ private final IndexCDCConsumerProgress progress;
+ // Flipped true once hasEventuallyConsistentIndexes() confirms this region
actually has an EC
+ // index. Until then sleepWithLagSampling does not emit, so tables that
immediately exit "no EC
+ // index" produce no cold-start lag samples into the global / per-table
histograms.
+ private volatile boolean lagEmissionEnabled = false;
private volatile boolean stopped = false;
private Thread consumerThread;
private boolean hasParentPartitions = false;
@@ -228,7 +243,11 @@ public class IndexCDCConsumer implements Runnable {
DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
this.parentProgressPauseMs =
config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS,
DEFAULT_PARENT_PROGRESS_PAUSE_MS);
+ this.lagSampleIntervalMs = Math.max(MIN_LAG_SAMPLE_INTERVAL_MS,
+ config.getLong(INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS,
DEFAULT_LAG_SAMPLE_INTERVAL_MS));
this.metricSource =
MetricsIndexerSourceFactory.getInstance().getIndexCDCConsumerSource();
+ this.progress = new
IndexCDCConsumerProgress(EnvironmentEdgeManager.currentTimeMillis(),
+ this.timestampBufferMs);
DelegateRegionCoprocessorEnvironment indexWriterEnv =
new DelegateRegionCoprocessorEnvironment(env,
ConnectionType.INDEX_WRITER_CONNECTION);
this.indexWriter =
@@ -259,13 +278,23 @@ public class IndexCDCConsumer implements Runnable {
}
/**
- * Sleeps for the specified duration if the consumer has not been stopped.
- * @param millis the duration to sleep in milliseconds.
- * @throws InterruptedException if the thread is interrupted while sleeping.
+ * Sleeps for up to {@code totalMillis}, emitting a {@code
cdcIndexUpdateLag} sample at the start
+ * of each {@code lagSampleIntervalMs} slice once {@link
#lagEmissionEnabled} is set. Aborts
+ * immediately when stopped. Used for all consumer-thread sleeps so the lag
metric stays
+ * non-silent across idle, failure, post-startup, and parent-replay phases.
*/
- private void sleepIfNotStopped(long millis) throws InterruptedException {
- if (!stopped) {
- Thread.sleep(millis);
+ private void sleepWithLagSampling(long totalMillis) throws
InterruptedException {
+ long deadline = EnvironmentEdgeManager.currentTimeMillis() + totalMillis;
+ while (!stopped) {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ if (lagEmissionEnabled) {
+ metricSource.updateCdcLag(dataTableName, progress.currentLagMs(now));
+ }
+ long remaining = deadline - now;
+ if (remaining <= 0) {
+ return;
+ }
+ Thread.sleep(Math.min(remaining, lagSampleIntervalMs));
}
}
@@ -374,7 +403,7 @@ public class IndexCDCConsumer implements Runnable {
"Error while retrieving partition keys from CDC_STREAM for partition
{} table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
partitionId, dataTableName, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
return null;
@@ -405,7 +434,7 @@ public class IndexCDCConsumer implements Runnable {
public void run() {
try {
if (startupDelayMs > 0 && getCDCStreamNumPartitions() <= 1) {
- sleepIfNotStopped(startupDelayMs);
+ sleepWithLagSampling(startupDelayMs);
}
if (stopped) {
return;
@@ -415,6 +444,9 @@ public class IndexCDCConsumer implements Runnable {
dataTableName);
return;
}
+ // Only enable lag sampling once we've confirmed this table actually has
an EC index,
+ // so non-EC-indexed tables don't pollute the lag histograms with
cold-start samples.
+ lagEmissionEnabled = true;
LOG.info(
"IndexCDCConsumer started for table {} region {}"
+ " [batchSize: {}, pollIntervalMs: {}, timestampBufferMs: {},
startupDelayMs: {},"
@@ -438,13 +470,14 @@ public class IndexCDCConsumer implements Runnable {
dataTableName, encodedRegionName);
return;
} else if (lastProcessedTimestamp > 0) {
+ progress.recordProcessed(lastProcessedTimestamp);
LOG.info(
"Found existing tracker entry for table {} region {} with
lastTimestamp {}. "
+ "Resuming from last position (region movement scenario).",
dataTableName, encodedRegionName, lastProcessedTimestamp);
} else {
if (hasParentPartitions) {
- sleepIfNotStopped(timestampBufferMs + 1);
+ sleepWithLagSampling(timestampBufferMs + 1);
replayAndCompleteParentRegions(encodedRegionName);
} else {
LOG.info("No parent partitions for table {} region {}, skipping
parent replay",
@@ -463,10 +496,10 @@ public class IndexCDCConsumer implements Runnable {
lastProcessedTimestamp, false);
}
if (lastProcessedTimestamp == previousTimestamp) {
- sleepIfNotStopped(ConnectionUtils.getPauseTime(pause,
++retryCount));
+ sleepWithLagSampling(ConnectionUtils.getPauseTime(pause,
++retryCount));
} else {
retryCount = 0;
- sleepIfNotStopped(pollIntervalMs);
+ sleepWithLagSampling(pollIntervalMs);
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
@@ -478,7 +511,7 @@ public class IndexCDCConsumer implements Runnable {
"Error processing CDC mutations for table {} region {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, encodedRegionName, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
} catch (InterruptedException e) {
@@ -518,7 +551,7 @@ public class IndexCDCConsumer implements Runnable {
"Error checking for eventually consistent indexes for table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
return false;
@@ -559,7 +592,7 @@ public class IndexCDCConsumer implements Runnable {
"Error getting CDC_STREAM row count for table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
return -1;
@@ -597,14 +630,14 @@ public class IndexCDCConsumer implements Runnable {
"CDC_STREAM entry not found for table {} partition {}. "
+ "Attempt #{}, sleeping {} ms before retrying...",
dataTableName, encodedRegionName, retryCount, sleepTime);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
} catch (SQLException e) {
long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
LOG.warn(
"Error checking CDC_STREAM for table {} partition {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, encodedRegionName, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
return false;
@@ -662,7 +695,7 @@ public class IndexCDCConsumer implements Runnable {
"Error checking IDX_CDC_TRACKER for table {} partition {} owner {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, partitionId, ownerPartitionId, retryCount, sleepTime,
e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
return 0;
@@ -695,7 +728,7 @@ public class IndexCDCConsumer implements Runnable {
"Error checking if partition {} is completed for table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
partitionId, dataTableName, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
return false;
@@ -739,7 +772,7 @@ public class IndexCDCConsumer implements Runnable {
"Error getting parent progress for partition {} table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
partitionId, dataTableName, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
throw new InterruptedException("IndexCDCConsumer stopped while getting
parent progress.");
@@ -778,7 +811,7 @@ public class IndexCDCConsumer implements Runnable {
"Error querying parent partitions from CDC_STREAM for table {}
partition {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, partitionId, retryCount, sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
return Collections.emptyList();
@@ -812,7 +845,7 @@ public class IndexCDCConsumer implements Runnable {
long previousOtherProgress;
do {
previousOtherProgress = otherProgress;
- sleepIfNotStopped(parentProgressPauseMs);
+ sleepWithLagSampling(parentProgressPauseMs);
if (isPartitionCompleted(partitionId)) {
return;
}
@@ -856,7 +889,7 @@ public class IndexCDCConsumer implements Runnable {
+ "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms",
partitionId, ownerPartitionId, dataTableName,
currentLastProcessedTimestamp, retryCount,
sleepTime, e);
- sleepIfNotStopped(sleepTime);
+ sleepWithLagSampling(sleepTime);
}
}
LOG.info("Processing partition {} (owner {}) stopped before completion for
table {}",
@@ -943,7 +976,11 @@ public class IndexCDCConsumer implements Runnable {
long newLastTimestamp = lastProcessedTimestamp;
boolean hasMoreRows = true;
int retryCount = 0;
+ // Captured immediately before each query so the empty-poll watermark
cannot over-advance
+ // past what the query's own (now - timestampBufferMs) upper bound
actually proved empty.
+ long lastQueryStartTime = newLastTimestamp;
while (hasMoreRows && batchMutations.isEmpty()) {
+ lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
setStatementParams(scanInfo, partitionId, isParentReplay,
newLastTimestamp, ps);
Pair<Long, Boolean> result =
@@ -952,11 +989,15 @@ public class IndexCDCConsumer implements Runnable {
if (hasMoreRows) {
newLastTimestamp = result.getFirst();
if (batchMutations.isEmpty()) {
- sleepIfNotStopped(ConnectionUtils.getPauseTime(pause,
++retryCount));
+ sleepWithLagSampling(ConnectionUtils.getPauseTime(pause,
++retryCount));
}
}
}
}
+ // Empty own-partition poll proves we are caught up to (queryStart -
timestampBufferMs).
+ if (!hasMoreRows && !isParentReplay) {
+ progress.recordEmptyPoll(lastQueryStartTime);
+ }
// With predefined LIMIT, there might be more rows with the same
timestamp that were not
// included in this batch.
if (newLastTimestamp > lastProcessedTimestamp) {
@@ -985,8 +1026,9 @@ public class IndexCDCConsumer implements Runnable {
metricSource.updateCdcBatchProcessTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
metricSource.incrementCdcBatchCount(dataTableName);
- metricSource.updateCdcLag(dataTableName,
- EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp);
+ if (!isParentReplay) {
+ progress.recordProcessed(newLastTimestamp);
+ }
updateTrackerProgress(conn, partitionId, ownerPartitionId,
newLastTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
}
@@ -1052,7 +1094,11 @@ public class IndexCDCConsumer implements Runnable {
long[] lastScannedTimestamp = { lastProcessedTimestamp };
boolean hasMoreRows = true;
int retryCount = 0;
+ // Captured immediately before each query so the empty-poll watermark
cannot over-advance
+ // past what the query's own (now - timestampBufferMs) upper bound
actually proved empty.
+ long lastQueryStartTime = newLastTimestamp;
while (hasMoreRows && batchStates.isEmpty()) {
+ lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
setStatementParams(scanInfo, partitionId, isParentReplay,
newLastTimestamp, ps);
Pair<Long, Boolean> result =
@@ -1067,16 +1113,24 @@ public class IndexCDCConsumer implements Runnable {
+ " to {} after {} retries — data table mutations may have
failed",
dataTableName, partitionId, newLastTimestamp,
lastScannedTimestamp[0], retryCount);
newLastTimestamp = lastScannedTimestamp[0];
+ // NOTE: durable tracker advances below (newLastTimestamp >
lastProcessedTimestamp)
+ // but progress.recordProcessed is skipped
(batchStates.isEmpty()). The in-memory
+ // watermark lags durable state until the next empty poll heals
it — over-reports
+ // lag temporarily (safe direction, self-healing).
break;
} else {
// CDC index entries are written but the data is not yet visible.
// Don't advance newLastTimestamp so the same events are
re-fetched
// once the data becomes visible.
- sleepIfNotStopped(ConnectionUtils.getPauseTime(pause,
++retryCount));
+ sleepWithLagSampling(ConnectionUtils.getPauseTime(pause,
++retryCount));
}
}
}
}
+ // Empty own-partition poll proves we are caught up to (queryStart -
timestampBufferMs).
+ if (!hasMoreRows && !isParentReplay) {
+ progress.recordEmptyPoll(lastQueryStartTime);
+ }
if (newLastTimestamp > lastProcessedTimestamp) {
String sameTimestampQuery = String.format(
"SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(),
\"CDC JSON\" "
@@ -1106,8 +1160,9 @@ public class IndexCDCConsumer implements Runnable {
metricSource.updateCdcBatchProcessTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
metricSource.incrementCdcBatchCount(dataTableName);
- metricSource.updateCdcLag(dataTableName,
- EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp);
+ if (!isParentReplay) {
+ progress.recordProcessed(newLastTimestamp);
+ }
}
if (newLastTimestamp > lastProcessedTimestamp) {
updateTrackerProgress(conn, partitionId, ownerPartitionId,
newLastTimestamp,
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java
new file mode 100644
index 0000000000..9c6e452a9d
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+/**
+ * Observable progress of an {@link IndexCDCConsumer}.
+ * <p>
+ * Thread-safety: <b>single-writer.</b> The owning consumer thread is the only
writer of
+ * {@link #recordProcessed} / {@link #recordEmptyPoll}; the same thread also
reads via
+ * {@link #currentLagMs}. {@code volatile} fields provide safe publication for
an off-thread
+ * observer (e.g. JMX scraper) that may read {@link #getEffectiveWatermark} or
+ * {@link #getLastEmptyPollEndTime}, but no off-thread <i>writer</i> is
supported. Watermark is
+ * monotonic; lag is clamped to zero on clock regressions.
+ */
+final class IndexCDCConsumerProgress {
+
+ // Wall-clock time at consumer construction; floor for currentLagMs before
any signal.
+ private final long consumerStartTime;
+ // Read-back buffer the consumer subtracts from `now` on its own-partition
CDC filter.
+ private final long timestampBufferMs;
+
+ // Latest CDC event timestamp this consumer has acknowledged for its own
partition.
+ private volatile long lastProcessedTimestamp = 0L;
+ // Wall-clock time captured immediately before the most recent own-partition
CDC poll that
+ // returned zero rows. Equals the upper bound of the poll's "no events exist
below" proof:
+ // (lastEmptyPollEndTime - timestampBufferMs) is the latest CDC ts the
consumer is caught up to.
+ private volatile long lastEmptyPollEndTime = 0L;
+ // Highest own-partition CDC timestamp the consumer is confirmed caught up
to (monotonic).
+ private volatile long effectiveWatermark = 0L;
+
+ IndexCDCConsumerProgress(long consumerStartTime, long timestampBufferMs) {
+ this.consumerStartTime = consumerStartTime;
+ this.timestampBufferMs = timestampBufferMs;
+ }
+
+ /**
+ * Record progress from a successful own-partition batch. Single-writer
(consumer thread only).
+ */
+ void recordProcessed(long ts) {
+ if (ts > lastProcessedTimestamp) {
+ lastProcessedTimestamp = ts;
+ }
+ advanceWatermark();
+ }
+
+ /** Record an own-partition CDC poll that returned zero rows. Single-writer.
*/
+ void recordEmptyPoll(long queryStartWallClock) {
+ if (queryStartWallClock > lastEmptyPollEndTime) {
+ lastEmptyPollEndTime = queryStartWallClock;
+ }
+ advanceWatermark();
+ }
+
+ /** Current lag in milliseconds. Floors at {@code now - consumerStartTime}
before any signal. */
+ long currentLagMs(long now) {
+ long base = effectiveWatermark > 0 ? effectiveWatermark :
consumerStartTime;
+ long lag = now - base;
+ return lag < 0 ? 0 : lag;
+ }
+
+ private void advanceWatermark() {
+ long emptyPollWatermark =
+ lastEmptyPollEndTime > 0 ? lastEmptyPollEndTime - timestampBufferMs : 0L;
+ long candidate = Math.max(lastProcessedTimestamp, emptyPollWatermark);
+ if (candidate > effectiveWatermark) {
+ effectiveWatermark = candidate;
+ }
+ }
+
+ long getEffectiveWatermark() {
+ return effectiveWatermark;
+ }
+
+ long getLastEmptyPollEndTime() {
+ return lastEmptyPollEndTime;
+ }
+}
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index a6e89bca81..15dad63a53 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -83,6 +83,11 @@
<artifactId>hbase-server</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-metrics-api</artifactId>
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java
new file mode 100644
index 0000000000..158c54d276
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_POLL_INTERVAL_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_INDEX_UPDATE_LAG;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSourceImpl;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Verifies the {@code cdcIndexUpdateLag} histogram keeps receiving samples
while the consumer is
+ * idle.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexCDCConsumerLagIT extends ParallelStatsDisabledIT {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IndexCDCConsumerLagIT.class);
+
+ private static final int TIMESTAMP_BUFFER_MS = 2_000;
+ private static final int POLL_INTERVAL_MS = 500;
+ private static final int LAG_SAMPLE_INTERVAL_MS = 500;
+ // Small retry pause so empty-poll backoff doesn't dominate idle behavior.
+ private static final int RETRY_PAUSE_MS = 100;
+ // Budget for the consumer to start up and emit its first lag sample.
Generous because the
+ // consumer waits up to INDEX_CDC_CONSUMER_STARTUP_DELAY_MS (default 10s)
and then performs
+ // CDC_STREAM / IDX_CDC_TRACKER lookups before its first poll. Sized for
slow CI / cold JVM.
+ private static final long CONSUMER_STARTUP_BUDGET_MS = 120_000L;
+ // Idle window for the flow check. Only need to prove ≥ 1 sample fires; kept
short to keep
+ // total test runtime low.
+ private static final long IDLE_WAIT_MS = 5_000L;
+ private static final long MAX_LOOKBACK_AGE = 1_000_000L;
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(10);
+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Long.toString(MAX_LOOKBACK_AGE));
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(2));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(1));
+ props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
Boolean.TRUE.toString());
+ props.put("hbase.coprocessor.master.classes",
PhoenixMasterObserver.class.getName());
+ props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS,
Integer.toString(TIMESTAMP_BUFFER_MS));
+ props.put(INDEX_CDC_CONSUMER_POLL_INTERVAL_MS,
Integer.toString(POLL_INTERVAL_MS));
+ props.put(INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS,
Integer.toString(LAG_SAMPLE_INTERVAL_MS));
+ props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS,
Integer.toString(RETRY_PAUSE_MS));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ private Connection getConnection() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
+ private MetricHistogram lagHistogram() {
+ MetricsIndexCDCConsumerSourceImpl source =
+ (MetricsIndexCDCConsumerSourceImpl)
MetricsIndexerSourceFactory.getInstance()
+ .getIndexCDCConsumerSource();
+ return source.getMetricsRegistry().getHistogram(CDC_INDEX_UPDATE_LAG);
+ }
+
+ /**
+ * Polls until the lag histogram has at least {@code minCount} samples, or
fails after timeout.
+ */
+ private void awaitMinCount(long minCount, long timeoutMs) throws
InterruptedException {
+ long deadline = System.currentTimeMillis() + timeoutMs;
+ long observed = 0L;
+ while (System.currentTimeMillis() < deadline) {
+ observed = lagHistogram().getCount();
+ if (observed >= minCount) {
+ return;
+ }
+ Thread.sleep(500L);
+ }
+ fail("Lag histogram never reached count=" + minCount + " within " +
timeoutMs + "ms; observed="
+ + observed);
+ }
+
+ @Test
+ public void testLagMetricKeepsSamplingWhenIdle() throws Exception {
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+
+ try (Connection conn = getConnection()) {
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + " (PK VARCHAR NOT NULL PRIMARY KEY," + " V1 VARCHAR, V2 VARCHAR)
COLUMN_ENCODED_BYTES=0");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName
+ + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " (PK, V1, V2) VALUES ('r1',
'v1', 'd1')");
+ conn.commit();
+ }
+
+ // Wait for the consumer thread to start and emit its first lag sample.
Replaces a fixed
+ // settle sleep so the test is robust to slow CI / cold JVM startup.
+ awaitMinCount(1L, CONSUMER_STARTUP_BUDGET_MS);
+
+ long countBeforeIdle = lagHistogram().getCount();
+ Thread.sleep(IDLE_WAIT_MS);
+ long countAfterIdle = lagHistogram().getCount();
+ long delta = countAfterIdle - countBeforeIdle;
+ LOG.info("Idle window {}ms: countBefore={}, countAfter={}, delta={}",
IDLE_WAIT_MS,
+ countBeforeIdle, countAfterIdle, delta);
+
+ assertTrue("Histogram count did not advance during idle; delta=" + delta,
delta >= 1);
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java
new file mode 100644
index 0000000000..3c87d8849b
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class IndexCDCConsumerProgressTest {
+
+ private static final long BUFFER_MS = 5_000L;
+ private static final long START_TIME = 1_000_000L;
+
+ private IndexCDCConsumerProgress newProgress() {
+ return new IndexCDCConsumerProgress(START_TIME, BUFFER_MS);
+ }
+
+ @Test
+ public void coldStartReportsLagSinceConsumerStart() {
+ IndexCDCConsumerProgress p = newProgress();
+ assertEquals(100L, p.currentLagMs(START_TIME + 100L));
+ assertEquals(0L, p.getEffectiveWatermark());
+ }
+
+ @Test
+ public void processedAdvancesWatermark() {
+ IndexCDCConsumerProgress p = newProgress();
+ long processed = START_TIME + 10_000L;
+ p.recordProcessed(processed);
+ assertEquals(processed, p.getEffectiveWatermark());
+ assertEquals(2_000L, p.currentLagMs(processed + 2_000L));
+ }
+
+ @Test
+ public void processedIsMonotonic() {
+ IndexCDCConsumerProgress p = newProgress();
+ p.recordProcessed(START_TIME + 1_000L);
+ p.recordProcessed(START_TIME + 500L);
+ assertEquals(START_TIME + 1_000L, p.getEffectiveWatermark());
+ }
+
+ @Test
+ public void emptyPollAdvancesWatermarkBuffersBelowPollTime() {
+ IndexCDCConsumerProgress p = newProgress();
+ long pollEnd = START_TIME + 20_000L;
+ p.recordEmptyPoll(pollEnd);
+ assertEquals(pollEnd - BUFFER_MS, p.getEffectiveWatermark());
+ // Lag at the same instant collapses to the buffer baseline.
+ assertEquals(BUFFER_MS, p.currentLagMs(pollEnd));
+ }
+
+ @Test
+ public void emptyPollIsMonotonic() {
+ IndexCDCConsumerProgress p = newProgress();
+ long firstPoll = START_TIME + 20_000L;
+ long earlierPoll = START_TIME + 10_000L;
+ p.recordEmptyPoll(firstPoll);
+ p.recordEmptyPoll(earlierPoll);
+ assertEquals(firstPoll, p.getLastEmptyPollEndTime());
+ assertEquals(firstPoll - BUFFER_MS, p.getEffectiveWatermark());
+ }
+
+ @Test
+ public void watermarkIsMaxOfProcessedAndEmptyPollFloor() {
+ IndexCDCConsumerProgress p = newProgress();
+ long processed = START_TIME + 50_000L;
+ long pollEnd = START_TIME + 30_000L;
+ p.recordProcessed(processed);
+ p.recordEmptyPoll(pollEnd);
+ // processed dominates because (pollEnd - BUFFER_MS) < processed
+ assertEquals(processed, p.getEffectiveWatermark());
+
+ // A later empty poll above (processed + BUFFER_MS) advances the watermark
again.
+ long laterPoll = processed + BUFFER_MS + 1_000L;
+ p.recordEmptyPoll(laterPoll);
+ assertEquals(laterPoll - BUFFER_MS, p.getEffectiveWatermark());
+ }
+
+ @Test
+ public void idleAfterEmptyPollStaysBoundedByBufferPlusElapsed() {
+ IndexCDCConsumerProgress p = newProgress();
+ long pollEnd = START_TIME + 20_000L;
+ p.recordEmptyPoll(pollEnd);
+ long elapsed = 7_500L;
+ assertEquals(BUFFER_MS + elapsed, p.currentLagMs(pollEnd + elapsed));
+ }
+
+ @Test
+ public void negativeLagClampedToZero() {
+ IndexCDCConsumerProgress p = newProgress();
+ long processed = START_TIME + 50_000L;
+ p.recordProcessed(processed);
+ // clock went backwards relative to the watermark
+ assertEquals(0L, p.currentLagMs(processed - 100L));
+ }
+
+ @Test
+ public void emptyPollOlderThanBufferContributesNothing() {
+ IndexCDCConsumerProgress p = newProgress();
+ long pollEnd = BUFFER_MS - 1L;
+ p.recordEmptyPoll(pollEnd);
+ // pollEnd - BUFFER_MS would be negative; do not pollute the watermark.
+ assertEquals(pollEnd, p.getLastEmptyPollEndTime());
+ assertEquals(0L, p.getEffectiveWatermark());
+ }
+}