This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 6442f2fbed PHOENIX-7883 : More metrics for EC index consumer (#2543)
6442f2fbed is described below
commit 6442f2fbed9335b8e9759d8497850784fb028f0d
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jun 25 13:56:52 2026 -0700
PHOENIX-7883 : More metrics for EC index consumer (#2543)
---
.../metrics/MetricsIndexCDCConsumerSource.java | 81 ++++++++
.../metrics/MetricsIndexCDCConsumerSourceImpl.java | 63 ++++++
.../phoenix/hbase/index/IndexCDCConsumer.java | 217 ++++++++++++---------
3 files changed, 266 insertions(+), 95 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
index cb39b5b38a..ac647638e1 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
@@ -50,6 +50,43 @@ public interface MetricsIndexCDCConsumerSource extends
BaseSource {
String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount";
String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing
failures";
+ String CDC_EVENT_SKIPPED_COUNT = "cdcEventSkippedCount";
+ String CDC_EVENT_SKIPPED_COUNT_DESC =
+ "The number of CDC events the consumer permanently advanced past because
their data table "
+ + "row state could not be read within
phoenix.index.cdc.consumer.max.data.visibility.retries"
+ + " attempts. Each event counted will never be applied to the eventually
consistent index "
+ + "\u2014 typically caused by failed or aborted data table mutations
upstream. Non-zero "
+ + "values indicate silent data divergence between the data table and its
EC indexes.";
+
+ String CDC_PARENT_REPLAY_ACTIVE_REGIONS = "cdcParentReplayActiveRegions";
+ String CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC =
+ "Gauge of regions currently in the parent-region replay phase (post-split
/ post-merge "
+ + "catch-up before steady-state own-partition processing begins).
Per-table value = number "
+ + "of regions of this table on this RegionServer currently replaying
ancestor partitions. "
+ + "The lag histogram inflates during this phase by design (parent-replay
timestamps do not "
+ + "advance the child's own-partition freshness watermark) \u2014 this
gauge lets operators "
+ + "distinguish 'normal post-split catch-up' from 'consumer is broken'.";
+
+ String CDC_PARENT_REPLAY_DURATION = "cdcParentReplayDuration";
+ String CDC_PARENT_REPLAY_DURATION_DESC =
+ "Histogram (milliseconds) of how long this consumer spent replaying one
ancestor partition "
+ + "during post-split / post-merge catch-up, measured from when this
consumer joined the "
+ + "replay until it reached a terminal state. One sample is emitted per
parent partition "
+ + "when this consumer either marks it COMPLETE or observes another
consumer marking it "
+ + "COMPLETE \u2014 in the latter case the sample is shorter than the
end-to-end partition "
+ + "replay time. Ancestors that were already COMPLETE when discovered
emit no sample.";
+
+ String CDC_CONSUMER_ACTIVE_REGIONS = "cdcConsumerActiveRegions";
+ String CDC_CONSUMER_ACTIVE_REGIONS_DESC =
+ "Gauge of regions whose IndexCDCConsumer is currently in steady-state
own-partition "
+ + "processing for this table on this RegionServer. Incremented
immediately before entering "
+ + "the main poll loop (after startup wait, EC-index discovery,
CDC_STREAM wait, tracker "
+ + "lookup, and any parent-region replay have all completed) and
decremented when the loop "
+ + "exits. A gauge of 0 where >0 is expected indicates the consumer
either never reached "
+ + "steady state (cold start, missing EC index, missing CDC_STREAM entry)
or exited "
+ + "(stopped, crashed, or region moved away). Combine with
cdcParentReplayActiveRegions to "
+ + "tell 'in catch-up' apart from 'in steady state' apart from 'not
running at all'.";
+
String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag";
String CDC_INDEX_UPDATE_LAG_DESC =
"Histogram of current time minus the consumer's effective freshness
watermark, in "
@@ -98,6 +135,50 @@ public interface MetricsIndexCDCConsumerSource extends
BaseSource {
*/
void incrementCdcBatchFailureCount(String dataTableName);
+ /**
+ * Increments the count of CDC events permanently skipped after exhausting
data-visibility
+ * retries. See {@link #CDC_EVENT_SKIPPED_COUNT_DESC}.
+ * @param dataTableName physical data table name
+ * @param count number of CDC events skipped in this give-up event
+ */
+ void incrementCdcEventSkippedCount(String dataTableName, long count);
+
+ /**
+ * Increments the parent-region replay active gauge by 1. Must be paired
with a corresponding
+ * {@link #decrementCdcParentReplayActiveRegions(String)} in a {@code
finally} block.
+ * @param dataTableName physical data table name
+ */
+ void incrementCdcParentReplayActiveRegions(String dataTableName);
+
+ /**
+ * Decrements the parent-region replay active gauge by 1. Must be invoked in
a {@code finally}
+ * block paired with {@link #incrementCdcParentReplayActiveRegions(String)}.
+ * @param dataTableName physical data table name
+ */
+ void decrementCdcParentReplayActiveRegions(String dataTableName);
+
+ /**
+ * Adds a sample to the parent-region replay duration histogram. Called once
per ancestor
+ * partition after its replay reaches a terminal state.
+ * @param dataTableName physical data table name
+ * @param durationMs wall-clock time spent replaying this ancestor
partition
+ */
+ void updateCdcParentReplayDuration(String dataTableName, long durationMs);
+
+ /**
+ * Increments the steady-state active-regions gauge by 1. Must be paired
with a corresponding
+ * {@link #decrementCdcConsumerActiveRegions(String)} in a {@code finally}
block.
+ * @param dataTableName physical data table name
+ */
+ void incrementCdcConsumerActiveRegions(String dataTableName);
+
+ /**
+ * Decrements the steady-state active-regions gauge by 1. Must be invoked in
a {@code finally}
+ * block paired with {@link #incrementCdcConsumerActiveRegions(String)}.
+ * @param dataTableName physical data table name
+ */
+ void decrementCdcConsumerActiveRegions(String dataTableName);
+
/**
* Updates the CDC lag histogram.
* @param dataTableName physical data table name
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
index 71c6382633..5158f4054b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.metrics;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
/**
* Implementation for tracking IndexCDCConsumer metrics.
@@ -33,6 +34,10 @@ public class MetricsIndexCDCConsumerSourceImpl extends
BaseSourceImpl
private final MutableFastCounter cdcBatchCounter;
private final MutableFastCounter cdcMutationCounter;
private final MutableFastCounter cdcBatchFailureCounter;
+ private final MutableFastCounter cdcEventSkippedCounter;
+ private final MutableGaugeLong cdcParentReplayActiveRegionsGauge;
+ private final MetricHistogram cdcParentReplayDurationHisto;
+ private final MutableGaugeLong cdcConsumerActiveRegionsGauge;
private final MetricHistogram cdcIndexUpdateLagHisto;
public MetricsIndexCDCConsumerSourceImpl() {
@@ -54,6 +59,14 @@ public class MetricsIndexCDCConsumerSourceImpl extends
BaseSourceImpl
getMetricsRegistry().newCounter(CDC_MUTATION_COUNT,
CDC_MUTATION_COUNT_DESC, 0L);
cdcBatchFailureCounter =
getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT,
CDC_BATCH_FAILURE_COUNT_DESC, 0L);
+ cdcEventSkippedCounter =
+ getMetricsRegistry().newCounter(CDC_EVENT_SKIPPED_COUNT,
CDC_EVENT_SKIPPED_COUNT_DESC, 0L);
+ cdcParentReplayActiveRegionsGauge = getMetricsRegistry()
+ .newGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS,
CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC, 0L);
+ cdcParentReplayDurationHisto =
getMetricsRegistry().newHistogram(CDC_PARENT_REPLAY_DURATION,
+ CDC_PARENT_REPLAY_DURATION_DESC);
+ cdcConsumerActiveRegionsGauge =
getMetricsRegistry().newGauge(CDC_CONSUMER_ACTIVE_REGIONS,
+ CDC_CONSUMER_ACTIVE_REGIONS_DESC, 0L);
cdcIndexUpdateLagHisto =
getMetricsRegistry().newHistogram(CDC_INDEX_UPDATE_LAG,
CDC_INDEX_UPDATE_LAG_DESC);
}
@@ -96,6 +109,44 @@ public class MetricsIndexCDCConsumerSourceImpl extends
BaseSourceImpl
cdcBatchFailureCounter.incr();
}
+ @Override
+ public void incrementCdcEventSkippedCount(String dataTableName, long count) {
+ MutableFastCounter tableCounter =
+ getMetricsRegistry().getCounter(getMetricName(CDC_EVENT_SKIPPED_COUNT,
dataTableName), 0);
+ tableCounter.incr(count);
+ cdcEventSkippedCounter.incr(count);
+ }
+
+ @Override
+ public void incrementCdcParentReplayActiveRegions(String dataTableName) {
+ incrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS,
dataTableName);
+ cdcParentReplayActiveRegionsGauge.incr();
+ }
+
+ @Override
+ public void decrementCdcParentReplayActiveRegions(String dataTableName) {
+ decrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS,
dataTableName);
+ cdcParentReplayActiveRegionsGauge.decr();
+ }
+
+ @Override
+ public void updateCdcParentReplayDuration(String dataTableName, long
durationMs) {
+ incrementTableSpecificHistogram(CDC_PARENT_REPLAY_DURATION, dataTableName,
durationMs);
+ cdcParentReplayDurationHisto.add(durationMs);
+ }
+
+ @Override
+ public void incrementCdcConsumerActiveRegions(String dataTableName) {
+ incrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName);
+ cdcConsumerActiveRegionsGauge.incr();
+ }
+
+ @Override
+ public void decrementCdcConsumerActiveRegions(String dataTableName) {
+ decrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName);
+ cdcConsumerActiveRegionsGauge.decr();
+ }
+
@Override
public void updateCdcLag(String dataTableName, long lag) {
incrementTableSpecificHistogram(CDC_INDEX_UPDATE_LAG, dataTableName, lag);
@@ -114,6 +165,18 @@ public class MetricsIndexCDCConsumerSourceImpl extends
BaseSourceImpl
tableHistogram.add(t);
}
+ private void incrementTableSpecificGauge(String baseName, String tableName) {
+ MutableGaugeLong tableGauge =
+ getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0);
+ tableGauge.incr();
+ }
+
+ private void decrementTableSpecificGauge(String baseName, String tableName) {
+ MutableGaugeLong tableGauge =
+ getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0);
+ tableGauge.decr();
+ }
+
private String getMetricName(String baseName, String tableName) {
return baseName + "." + tableName;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 412e380b66..25b0e18f26 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -146,7 +146,7 @@ public class IndexCDCConsumer implements Runnable {
*/
public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS =
"phoenix.index.cdc.consumer.lag.sample.interval.ms";
- private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L;
+ private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 5000L;
private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L;
private final RegionCoprocessorEnvironment env;
@@ -477,42 +477,52 @@ public class IndexCDCConsumer implements Runnable {
dataTableName, encodedRegionName, lastProcessedTimestamp);
} else {
if (hasParentPartitions) {
- sleepWithLagSampling(timestampBufferMs + 1);
- replayAndCompleteParentRegions(encodedRegionName);
+ metricSource.incrementCdcParentReplayActiveRegions(dataTableName);
+ try {
+ sleepWithLagSampling(timestampBufferMs + 1);
+ replayAndCompleteParentRegions(encodedRegionName);
+ } finally {
+ metricSource.decrementCdcParentReplayActiveRegions(dataTableName);
+ }
} else {
LOG.info("No parent partitions for table {} region {}, skipping
parent replay",
dataTableName, encodedRegionName);
}
}
int retryCount = 0;
- while (!stopped) {
- try {
- long previousTimestamp = lastProcessedTimestamp;
- if (serializeCDCMutations) {
- lastProcessedTimestamp =
- processCDCBatch(encodedRegionName, encodedRegionName,
lastProcessedTimestamp, false);
- } else {
- lastProcessedTimestamp =
processCDCBatchGenerated(encodedRegionName, encodedRegionName,
- lastProcessedTimestamp, false);
- }
- if (lastProcessedTimestamp == previousTimestamp) {
- sleepWithLagSampling(ConnectionUtils.getPauseTime(pause,
++retryCount));
- } else {
- retryCount = 0;
- sleepWithLagSampling(pollIntervalMs);
- }
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- throw (InterruptedException) e;
+ metricSource.incrementCdcConsumerActiveRegions(dataTableName);
+ try {
+ while (!stopped) {
+ try {
+ long previousTimestamp = lastProcessedTimestamp;
+ if (serializeCDCMutations) {
+ lastProcessedTimestamp = processCDCBatch(encodedRegionName,
encodedRegionName,
+ lastProcessedTimestamp, false);
+ } else {
+ lastProcessedTimestamp =
processCDCBatchGenerated(encodedRegionName,
+ encodedRegionName, lastProcessedTimestamp, false);
+ }
+ if (lastProcessedTimestamp == previousTimestamp) {
+ sleepWithLagSampling(ConnectionUtils.getPauseTime(pause,
++retryCount));
+ } else {
+ retryCount = 0;
+ sleepWithLagSampling(pollIntervalMs);
+ }
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ throw (InterruptedException) e;
+ }
+ metricSource.incrementCdcBatchFailureCount(dataTableName);
+ long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
+ LOG.error(
+ "Error processing CDC mutations for table {} region {}. "
+ + "Retry #{}, sleeping {} ms before retrying...",
+ dataTableName, encodedRegionName, retryCount, sleepTime, e);
+ sleepWithLagSampling(sleepTime);
}
- metricSource.incrementCdcBatchFailureCount(dataTableName);
- long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
- LOG.error(
- "Error processing CDC mutations for table {} region {}. "
- + "Retry #{}, sleeping {} ms before retrying...",
- dataTableName, encodedRegionName, retryCount, sleepTime, e);
- sleepWithLagSampling(sleepTime);
}
+ } finally {
+ metricSource.decrementCdcConsumerActiveRegions(dataTableName);
}
} catch (InterruptedException e) {
if (!stopped) {
@@ -834,66 +844,79 @@ public class IndexCDCConsumer implements Runnable {
long currentLastProcessedTimestamp = lastProcessedTimestamp;
int retryCount = 0;
int batchCount = 0;
- while (!stopped) {
- try {
- if (batchCount > 0) {
- if (isPartitionCompleted(partitionId)) {
- return;
+ long replayStartTime = EnvironmentEdgeManager.currentTimeMillis();
+ boolean reachedTerminalState = false;
+ try {
+ while (!stopped) {
+ try {
+ if (batchCount > 0) {
+ if (isPartitionCompleted(partitionId)) {
+ reachedTerminalState = true;
+ return;
+ }
+ long otherProgress = getParentProgress(partitionId);
+ if (otherProgress > currentLastProcessedTimestamp) {
+ long previousOtherProgress;
+ do {
+ previousOtherProgress = otherProgress;
+ sleepWithLagSampling(parentProgressPauseMs);
+ if (isPartitionCompleted(partitionId)) {
+ reachedTerminalState = true;
+ return;
+ }
+ otherProgress = getParentProgress(partitionId);
+ } while (!stopped && otherProgress > previousOtherProgress);
+ currentLastProcessedTimestamp = otherProgress;
+ }
}
- long otherProgress = getParentProgress(partitionId);
- if (otherProgress > currentLastProcessedTimestamp) {
- long previousOtherProgress;
- do {
- previousOtherProgress = otherProgress;
- sleepWithLagSampling(parentProgressPauseMs);
- if (isPartitionCompleted(partitionId)) {
- return;
- }
- otherProgress = getParentProgress(partitionId);
- } while (!stopped && otherProgress > previousOtherProgress);
- currentLastProcessedTimestamp = otherProgress;
+ long newTimestamp;
+ if (serializeCDCMutations) {
+ newTimestamp =
+ processCDCBatch(partitionId, ownerPartitionId,
currentLastProcessedTimestamp, true);
+ } else {
+ newTimestamp = processCDCBatchGenerated(partitionId,
ownerPartitionId,
+ currentLastProcessedTimestamp, true);
}
- }
- long newTimestamp;
- if (serializeCDCMutations) {
- newTimestamp =
- processCDCBatch(partitionId, ownerPartitionId,
currentLastProcessedTimestamp, true);
- } else {
- newTimestamp = processCDCBatchGenerated(partitionId,
ownerPartitionId,
- currentLastProcessedTimestamp, true);
- }
- batchCount++;
- retryCount = 0;
- if (newTimestamp == currentLastProcessedTimestamp) {
- if (isPartitionCompleted(partitionId)) {
- LOG.info(
- "Partition {} for table {} was completed by another consumer
before {} could mark it",
- partitionId, dataTableName, ownerPartitionId);
+ batchCount++;
+ retryCount = 0;
+ if (newTimestamp == currentLastProcessedTimestamp) {
+ if (isPartitionCompleted(partitionId)) {
+ LOG.info(
+ "Partition {} for table {} was completed by another consumer
before {} could mark it",
+ partitionId, dataTableName, ownerPartitionId);
+ reachedTerminalState = true;
+ return;
+ }
+ LOG.info("Partition {} owner {} for table {} fully processed,
marking as COMPLETE",
+ partitionId, ownerPartitionId, dataTableName);
+ try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(env.getConfiguration())
+ .unwrap(PhoenixConnection.class)) {
+ updateTrackerProgress(conn, partitionId, ownerPartitionId,
+ currentLastProcessedTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE);
+ }
+ reachedTerminalState = true;
return;
}
- LOG.info("Partition {} owner {} for table {} fully processed,
marking as COMPLETE",
- partitionId, ownerPartitionId, dataTableName);
- try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(env.getConfiguration())
- .unwrap(PhoenixConnection.class)) {
- updateTrackerProgress(conn, partitionId, ownerPartitionId,
- currentLastProcessedTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE);
- }
- return;
+ currentLastProcessedTimestamp = newTimestamp;
+ } catch (SQLException | IOException e) {
+ metricSource.incrementCdcBatchFailureCount(dataTableName);
+ long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
+ LOG.warn(
+ "Error processing CDC batch for partition {} owner {} table {} "
+ + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms",
+ partitionId, ownerPartitionId, dataTableName,
currentLastProcessedTimestamp, retryCount,
+ sleepTime, e);
+ sleepWithLagSampling(sleepTime);
}
- currentLastProcessedTimestamp = newTimestamp;
- } catch (SQLException | IOException e) {
- metricSource.incrementCdcBatchFailureCount(dataTableName);
- long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
- LOG.warn(
- "Error processing CDC batch for partition {} owner {} table {} "
- + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms",
- partitionId, ownerPartitionId, dataTableName,
currentLastProcessedTimestamp, retryCount,
- sleepTime, e);
- sleepWithLagSampling(sleepTime);
+ }
+ LOG.info("Processing partition {} (owner {}) stopped before completion
for table {}",
+ partitionId, ownerPartitionId, dataTableName);
+ } finally {
+ if (reachedTerminalState) {
+ metricSource.updateCdcParentReplayDuration(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - replayStartTime);
}
}
- LOG.info("Processing partition {} (owner {}) stopped before completion for
table {}",
- partitionId, ownerPartitionId, dataTableName);
}
/**
@@ -989,6 +1012,9 @@ public class IndexCDCConsumer implements Runnable {
if (hasMoreRows) {
newLastTimestamp = result.getFirst();
if (batchMutations.isEmpty()) {
+ if (!isParentReplay) {
+ progress.recordProcessed(newLastTimestamp);
+ }
sleepWithLagSampling(ConnectionUtils.getPauseTime(pause,
++retryCount));
}
}
@@ -1092,6 +1118,7 @@ public class IndexCDCConsumer implements Runnable {
List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates = new
ArrayList<>();
long newLastTimestamp = lastProcessedTimestamp;
long[] lastScannedTimestamp = { lastProcessedTimestamp };
+ long[] scannedRowCount = { 0L };
boolean hasMoreRows = true;
int retryCount = 0;
// Captured immediately before each query so the empty-poll watermark
cannot over-advance
@@ -1101,22 +1128,20 @@ public class IndexCDCConsumer implements Runnable {
lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
setStatementParams(scanInfo, partitionId, isParentReplay,
newLastTimestamp, ps);
- Pair<Long, Boolean> result =
- getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates,
lastScannedTimestamp);
+ Pair<Long, Boolean> result = getDataRowStatesAndTimestamp(ps,
newLastTimestamp,
+ batchStates, lastScannedTimestamp, scannedRowCount);
hasMoreRows = result.getSecond();
if (hasMoreRows) {
if (!batchStates.isEmpty()) {
newLastTimestamp = result.getFirst();
} else if (retryCount >= maxDataVisibilityRetries) {
LOG.warn(
- "Skipping CDC events for table {} partition {} from timestamp
{}"
+ "Skipping {} CDC events for table {} partition {} from
timestamp {}"
+ " to {} after {} retries — data table mutations may have
failed",
- dataTableName, partitionId, newLastTimestamp,
lastScannedTimestamp[0], retryCount);
+ scannedRowCount[0], dataTableName, partitionId,
newLastTimestamp,
+ lastScannedTimestamp[0], retryCount);
+ metricSource.incrementCdcEventSkippedCount(dataTableName,
scannedRowCount[0]);
newLastTimestamp = lastScannedTimestamp[0];
- // NOTE: durable tracker advances below (newLastTimestamp >
lastProcessedTimestamp)
- // but progress.recordProcessed is skipped
(batchStates.isEmpty()). The in-memory
- // watermark lags durable state until the next empty poll heals
it — over-reports
- // lag temporarily (safe direction, self-healing).
break;
} else {
// CDC index entries are written but the data is not yet visible.
@@ -1143,8 +1168,8 @@ public class IndexCDCConsumer implements Runnable {
int idx = scanInfo.bindParams(ps, 1);
ps.setString(idx++, partitionId);
ps.setDate(idx, new Date(newLastTimestamp));
- Pair<Long, Boolean> result =
- getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates,
lastScannedTimestamp);
+ Pair<Long, Boolean> result = getDataRowStatesAndTimestamp(ps,
newLastTimestamp,
+ batchStates, lastScannedTimestamp, scannedRowCount);
newLastTimestamp = result.getFirst();
if (batchStates.isEmpty()) {
newLastTimestamp = timestampToRefetch;
@@ -1160,11 +1185,11 @@ public class IndexCDCConsumer implements Runnable {
metricSource.updateCdcBatchProcessTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
metricSource.incrementCdcBatchCount(dataTableName);
+ }
+ if (newLastTimestamp > lastProcessedTimestamp) {
if (!isParentReplay) {
progress.recordProcessed(newLastTimestamp);
}
- }
- if (newLastTimestamp > lastProcessedTimestamp) {
updateTrackerProgress(conn, partitionId, ownerPartitionId,
newLastTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
}
@@ -1188,13 +1213,15 @@ public class IndexCDCConsumer implements Runnable {
private static Pair<Long, Boolean>
getDataRowStatesAndTimestamp(PreparedStatement ps,
long initialLastTimestamp, List<Pair<Long,
IndexMutationsProtos.DataRowStates>> batchStates,
- long[] lastScannedTimestamp) throws SQLException, IOException {
+ long[] lastScannedTimestamp, long[] scannedRowCount) throws SQLException,
IOException {
boolean hasRows = false;
long lastTimestamp = initialLastTimestamp;
lastScannedTimestamp[0] = initialLastTimestamp;
+ scannedRowCount[0] = 0L;
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
hasRows = true;
+ scannedRowCount[0]++;
long rowTimestamp = rs.getDate(1).getTime();
lastScannedTimestamp[0] = rowTimestamp;
String cdcValue = rs.getString(2);