This is an automated email from the ASF dual-hosted git repository.

virajjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 6442f2fbed PHOENIX-7883 : More metrics for EC index consumer (#2543)
6442f2fbed is described below

commit 6442f2fbed9335b8e9759d8497850784fb028f0d
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jun 25 13:56:52 2026 -0700

    PHOENIX-7883 : More metrics for EC index consumer (#2543)
---
 .../metrics/MetricsIndexCDCConsumerSource.java     |  81 ++++++++
 .../metrics/MetricsIndexCDCConsumerSourceImpl.java |  63 ++++++
 .../phoenix/hbase/index/IndexCDCConsumer.java      | 217 ++++++++++++---------
 3 files changed, 266 insertions(+), 95 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
index cb39b5b38a..ac647638e1 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
@@ -50,6 +50,43 @@ public interface MetricsIndexCDCConsumerSource extends 
BaseSource {
   String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount";
   String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing 
failures";
 
+  String CDC_EVENT_SKIPPED_COUNT = "cdcEventSkippedCount";
+  String CDC_EVENT_SKIPPED_COUNT_DESC =
+    "The number of CDC events the consumer permanently advanced past because 
their data table "
+      + "row state could not be read within 
phoenix.index.cdc.consumer.max.data.visibility.retries"
+      + " attempts. Each event counted will never be applied to the eventually 
consistent index "
+      + "\u2014 typically caused by failed or aborted data table mutations 
upstream. Non-zero "
+      + "values indicate silent data divergence between the data table and its 
EC indexes.";
+
+  String CDC_PARENT_REPLAY_ACTIVE_REGIONS = "cdcParentReplayActiveRegions";
+  String CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC =
+    "Gauge of regions currently in the parent-region replay phase (post-split 
/ post-merge "
+      + "catch-up before steady-state own-partition processing begins). 
Per-table value = number "
+      + "of regions of this table on this RegionServer currently replaying 
ancestor partitions. "
+      + "The lag histogram inflates during this phase by design (parent-replay 
timestamps do not "
+      + "advance the child's own-partition freshness watermark) \u2014 this 
gauge lets operators "
+      + "distinguish 'normal post-split catch-up' from 'consumer is broken'.";
+
+  String CDC_PARENT_REPLAY_DURATION = "cdcParentReplayDuration";
+  String CDC_PARENT_REPLAY_DURATION_DESC =
+    "Histogram (milliseconds) of how long this consumer spent replaying one 
ancestor partition "
+      + "during post-split / post-merge catch-up, measured from when this 
consumer joined the "
+      + "replay until it reached a terminal state. One sample is emitted per 
parent partition "
+      + "when this consumer either marks it COMPLETE or observes another 
consumer marking it "
+      + "COMPLETE \u2014 in the latter case the sample is shorter than the 
end-to-end partition "
+      + "replay time. Ancestors that were already COMPLETE when discovered 
emit no sample.";
+
+  String CDC_CONSUMER_ACTIVE_REGIONS = "cdcConsumerActiveRegions";
+  String CDC_CONSUMER_ACTIVE_REGIONS_DESC =
+    "Gauge of regions whose IndexCDCConsumer is currently in steady-state 
own-partition "
+      + "processing for this table on this RegionServer. Incremented 
immediately before entering "
+      + "the main poll loop (after startup wait, EC-index discovery, 
CDC_STREAM wait, tracker "
+      + "lookup, and any parent-region replay have all completed) and 
decremented when the loop "
+      + "exits. A gauge of 0 where >0 is expected indicates the consumer 
either never reached "
+      + "steady state (cold start, missing EC index, missing CDC_STREAM entry) 
or exited "
+      + "(stopped, crashed, or region moved away). Combine with 
cdcParentReplayActiveRegions to "
+      + "tell 'in catch-up' apart from 'in steady state' apart from 'not 
running at all'.";
+
   String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag";
   String CDC_INDEX_UPDATE_LAG_DESC =
     "Histogram of current time minus the consumer's effective freshness 
watermark, in "
@@ -98,6 +135,50 @@ public interface MetricsIndexCDCConsumerSource extends 
BaseSource {
    */
   void incrementCdcBatchFailureCount(String dataTableName);
 
+  /**
+   * Increments the count of CDC events permanently skipped after exhausting 
data-visibility
+   * retries. See {@link #CDC_EVENT_SKIPPED_COUNT_DESC}.
+   * @param dataTableName physical data table name
+   * @param count         number of CDC events skipped in this give-up event
+   */
+  void incrementCdcEventSkippedCount(String dataTableName, long count);
+
+  /**
+   * Increments the parent-region replay active gauge by 1. Must be paired 
with a corresponding
+   * {@link #decrementCdcParentReplayActiveRegions(String)} in a {@code 
finally} block.
+   * @param dataTableName physical data table name
+   */
+  void incrementCdcParentReplayActiveRegions(String dataTableName);
+
+  /**
+   * Decrements the parent-region replay active gauge by 1. Must be invoked in 
a {@code finally}
+   * block paired with {@link #incrementCdcParentReplayActiveRegions(String)}.
+   * @param dataTableName physical data table name
+   */
+  void decrementCdcParentReplayActiveRegions(String dataTableName);
+
+  /**
+   * Adds a sample to the parent-region replay duration histogram. Called once 
per ancestor
+   * partition after its replay reaches a terminal state.
+   * @param dataTableName physical data table name
+   * @param durationMs    wall-clock time spent replaying this ancestor 
partition
+   */
+  void updateCdcParentReplayDuration(String dataTableName, long durationMs);
+
+  /**
+   * Increments the steady-state active-regions gauge by 1. Must be paired 
with a corresponding
+   * {@link #decrementCdcConsumerActiveRegions(String)} in a {@code finally} 
block.
+   * @param dataTableName physical data table name
+   */
+  void incrementCdcConsumerActiveRegions(String dataTableName);
+
+  /**
+   * Decrements the steady-state active-regions gauge by 1. Must be invoked in 
a {@code finally}
+   * block paired with {@link #incrementCdcConsumerActiveRegions(String)}.
+   * @param dataTableName physical data table name
+   */
+  void decrementCdcConsumerActiveRegions(String dataTableName);
+
   /**
    * Updates the CDC lag histogram.
    * @param dataTableName physical data table name
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
index 71c6382633..5158f4054b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.metrics;
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
 import org.apache.hadoop.metrics2.MetricHistogram;
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 
 /**
  * Implementation for tracking IndexCDCConsumer metrics.
@@ -33,6 +34,10 @@ public class MetricsIndexCDCConsumerSourceImpl extends 
BaseSourceImpl
   private final MutableFastCounter cdcBatchCounter;
   private final MutableFastCounter cdcMutationCounter;
   private final MutableFastCounter cdcBatchFailureCounter;
+  private final MutableFastCounter cdcEventSkippedCounter;
+  private final MutableGaugeLong cdcParentReplayActiveRegionsGauge;
+  private final MetricHistogram cdcParentReplayDurationHisto;
+  private final MutableGaugeLong cdcConsumerActiveRegionsGauge;
   private final MetricHistogram cdcIndexUpdateLagHisto;
 
   public MetricsIndexCDCConsumerSourceImpl() {
@@ -54,6 +59,14 @@ public class MetricsIndexCDCConsumerSourceImpl extends 
BaseSourceImpl
       getMetricsRegistry().newCounter(CDC_MUTATION_COUNT, 
CDC_MUTATION_COUNT_DESC, 0L);
     cdcBatchFailureCounter =
       getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT, 
CDC_BATCH_FAILURE_COUNT_DESC, 0L);
+    cdcEventSkippedCounter =
+      getMetricsRegistry().newCounter(CDC_EVENT_SKIPPED_COUNT, 
CDC_EVENT_SKIPPED_COUNT_DESC, 0L);
+    cdcParentReplayActiveRegionsGauge = getMetricsRegistry()
+      .newGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, 
CDC_PARENT_REPLAY_ACTIVE_REGIONS_DESC, 0L);
+    cdcParentReplayDurationHisto = 
getMetricsRegistry().newHistogram(CDC_PARENT_REPLAY_DURATION,
+      CDC_PARENT_REPLAY_DURATION_DESC);
+    cdcConsumerActiveRegionsGauge = 
getMetricsRegistry().newGauge(CDC_CONSUMER_ACTIVE_REGIONS,
+      CDC_CONSUMER_ACTIVE_REGIONS_DESC, 0L);
     cdcIndexUpdateLagHisto =
       getMetricsRegistry().newHistogram(CDC_INDEX_UPDATE_LAG, 
CDC_INDEX_UPDATE_LAG_DESC);
   }
@@ -96,6 +109,44 @@ public class MetricsIndexCDCConsumerSourceImpl extends 
BaseSourceImpl
     cdcBatchFailureCounter.incr();
   }
 
+  @Override
+  public void incrementCdcEventSkippedCount(String dataTableName, long count) {
+    MutableFastCounter tableCounter =
+      getMetricsRegistry().getCounter(getMetricName(CDC_EVENT_SKIPPED_COUNT, 
dataTableName), 0);
+    tableCounter.incr(count);
+    cdcEventSkippedCounter.incr(count);
+  }
+
+  @Override
+  public void incrementCdcParentReplayActiveRegions(String dataTableName) {
+    incrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, 
dataTableName);
+    cdcParentReplayActiveRegionsGauge.incr();
+  }
+
+  @Override
+  public void decrementCdcParentReplayActiveRegions(String dataTableName) {
+    decrementTableSpecificGauge(CDC_PARENT_REPLAY_ACTIVE_REGIONS, 
dataTableName);
+    cdcParentReplayActiveRegionsGauge.decr();
+  }
+
+  @Override
+  public void updateCdcParentReplayDuration(String dataTableName, long 
durationMs) {
+    incrementTableSpecificHistogram(CDC_PARENT_REPLAY_DURATION, dataTableName, 
durationMs);
+    cdcParentReplayDurationHisto.add(durationMs);
+  }
+
+  @Override
+  public void incrementCdcConsumerActiveRegions(String dataTableName) {
+    incrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName);
+    cdcConsumerActiveRegionsGauge.incr();
+  }
+
+  @Override
+  public void decrementCdcConsumerActiveRegions(String dataTableName) {
+    decrementTableSpecificGauge(CDC_CONSUMER_ACTIVE_REGIONS, dataTableName);
+    cdcConsumerActiveRegionsGauge.decr();
+  }
+
   @Override
   public void updateCdcLag(String dataTableName, long lag) {
     incrementTableSpecificHistogram(CDC_INDEX_UPDATE_LAG, dataTableName, lag);
@@ -114,6 +165,18 @@ public class MetricsIndexCDCConsumerSourceImpl extends 
BaseSourceImpl
     tableHistogram.add(t);
   }
 
+  private void incrementTableSpecificGauge(String baseName, String tableName) {
+    MutableGaugeLong tableGauge =
+      getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0);
+    tableGauge.incr();
+  }
+
+  private void decrementTableSpecificGauge(String baseName, String tableName) {
+    MutableGaugeLong tableGauge =
+      getMetricsRegistry().getGauge(getMetricName(baseName, tableName), 0);
+    tableGauge.decr();
+  }
+
   private String getMetricName(String baseName, String tableName) {
     return baseName + "." + tableName;
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 412e380b66..25b0e18f26 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -146,7 +146,7 @@ public class IndexCDCConsumer implements Runnable {
    */
   public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS =
     "phoenix.index.cdc.consumer.lag.sample.interval.ms";
-  private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L;
+  private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 5000L;
   private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L;
 
   private final RegionCoprocessorEnvironment env;
@@ -477,42 +477,52 @@ public class IndexCDCConsumer implements Runnable {
           dataTableName, encodedRegionName, lastProcessedTimestamp);
       } else {
         if (hasParentPartitions) {
-          sleepWithLagSampling(timestampBufferMs + 1);
-          replayAndCompleteParentRegions(encodedRegionName);
+          metricSource.incrementCdcParentReplayActiveRegions(dataTableName);
+          try {
+            sleepWithLagSampling(timestampBufferMs + 1);
+            replayAndCompleteParentRegions(encodedRegionName);
+          } finally {
+            metricSource.decrementCdcParentReplayActiveRegions(dataTableName);
+          }
         } else {
           LOG.info("No parent partitions for table {} region {}, skipping 
parent replay",
             dataTableName, encodedRegionName);
         }
       }
       int retryCount = 0;
-      while (!stopped) {
-        try {
-          long previousTimestamp = lastProcessedTimestamp;
-          if (serializeCDCMutations) {
-            lastProcessedTimestamp =
-              processCDCBatch(encodedRegionName, encodedRegionName, 
lastProcessedTimestamp, false);
-          } else {
-            lastProcessedTimestamp = 
processCDCBatchGenerated(encodedRegionName, encodedRegionName,
-              lastProcessedTimestamp, false);
-          }
-          if (lastProcessedTimestamp == previousTimestamp) {
-            sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, 
++retryCount));
-          } else {
-            retryCount = 0;
-            sleepWithLagSampling(pollIntervalMs);
-          }
-        } catch (Exception e) {
-          if (e instanceof InterruptedException) {
-            throw (InterruptedException) e;
+      metricSource.incrementCdcConsumerActiveRegions(dataTableName);
+      try {
+        while (!stopped) {
+          try {
+            long previousTimestamp = lastProcessedTimestamp;
+            if (serializeCDCMutations) {
+              lastProcessedTimestamp = processCDCBatch(encodedRegionName, 
encodedRegionName,
+                lastProcessedTimestamp, false);
+            } else {
+              lastProcessedTimestamp = 
processCDCBatchGenerated(encodedRegionName,
+                encodedRegionName, lastProcessedTimestamp, false);
+            }
+            if (lastProcessedTimestamp == previousTimestamp) {
+              sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, 
++retryCount));
+            } else {
+              retryCount = 0;
+              sleepWithLagSampling(pollIntervalMs);
+            }
+          } catch (Exception e) {
+            if (e instanceof InterruptedException) {
+              throw (InterruptedException) e;
+            }
+            metricSource.incrementCdcBatchFailureCount(dataTableName);
+            long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
+            LOG.error(
+              "Error processing CDC mutations for table {} region {}. "
+                + "Retry #{}, sleeping {} ms before retrying...",
+              dataTableName, encodedRegionName, retryCount, sleepTime, e);
+            sleepWithLagSampling(sleepTime);
           }
-          metricSource.incrementCdcBatchFailureCount(dataTableName);
-          long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
-          LOG.error(
-            "Error processing CDC mutations for table {} region {}. "
-              + "Retry #{}, sleeping {} ms before retrying...",
-            dataTableName, encodedRegionName, retryCount, sleepTime, e);
-          sleepWithLagSampling(sleepTime);
         }
+      } finally {
+        metricSource.decrementCdcConsumerActiveRegions(dataTableName);
       }
     } catch (InterruptedException e) {
       if (!stopped) {
@@ -834,66 +844,79 @@ public class IndexCDCConsumer implements Runnable {
     long currentLastProcessedTimestamp = lastProcessedTimestamp;
     int retryCount = 0;
     int batchCount = 0;
-    while (!stopped) {
-      try {
-        if (batchCount > 0) {
-          if (isPartitionCompleted(partitionId)) {
-            return;
+    long replayStartTime = EnvironmentEdgeManager.currentTimeMillis();
+    boolean reachedTerminalState = false;
+    try {
+      while (!stopped) {
+        try {
+          if (batchCount > 0) {
+            if (isPartitionCompleted(partitionId)) {
+              reachedTerminalState = true;
+              return;
+            }
+            long otherProgress = getParentProgress(partitionId);
+            if (otherProgress > currentLastProcessedTimestamp) {
+              long previousOtherProgress;
+              do {
+                previousOtherProgress = otherProgress;
+                sleepWithLagSampling(parentProgressPauseMs);
+                if (isPartitionCompleted(partitionId)) {
+                  reachedTerminalState = true;
+                  return;
+                }
+                otherProgress = getParentProgress(partitionId);
+              } while (!stopped && otherProgress > previousOtherProgress);
+              currentLastProcessedTimestamp = otherProgress;
+            }
           }
-          long otherProgress = getParentProgress(partitionId);
-          if (otherProgress > currentLastProcessedTimestamp) {
-            long previousOtherProgress;
-            do {
-              previousOtherProgress = otherProgress;
-              sleepWithLagSampling(parentProgressPauseMs);
-              if (isPartitionCompleted(partitionId)) {
-                return;
-              }
-              otherProgress = getParentProgress(partitionId);
-            } while (!stopped && otherProgress > previousOtherProgress);
-            currentLastProcessedTimestamp = otherProgress;
+          long newTimestamp;
+          if (serializeCDCMutations) {
+            newTimestamp =
+              processCDCBatch(partitionId, ownerPartitionId, 
currentLastProcessedTimestamp, true);
+          } else {
+            newTimestamp = processCDCBatchGenerated(partitionId, 
ownerPartitionId,
+              currentLastProcessedTimestamp, true);
           }
-        }
-        long newTimestamp;
-        if (serializeCDCMutations) {
-          newTimestamp =
-            processCDCBatch(partitionId, ownerPartitionId, 
currentLastProcessedTimestamp, true);
-        } else {
-          newTimestamp = processCDCBatchGenerated(partitionId, 
ownerPartitionId,
-            currentLastProcessedTimestamp, true);
-        }
-        batchCount++;
-        retryCount = 0;
-        if (newTimestamp == currentLastProcessedTimestamp) {
-          if (isPartitionCompleted(partitionId)) {
-            LOG.info(
-              "Partition {} for table {} was completed by another consumer 
before {} could mark it",
-              partitionId, dataTableName, ownerPartitionId);
+          batchCount++;
+          retryCount = 0;
+          if (newTimestamp == currentLastProcessedTimestamp) {
+            if (isPartitionCompleted(partitionId)) {
+              LOG.info(
+                "Partition {} for table {} was completed by another consumer 
before {} could mark it",
+                partitionId, dataTableName, ownerPartitionId);
+              reachedTerminalState = true;
+              return;
+            }
+            LOG.info("Partition {} owner {} for table {} fully processed, 
marking as COMPLETE",
+              partitionId, ownerPartitionId, dataTableName);
+            try (PhoenixConnection conn = 
QueryUtil.getConnectionOnServer(env.getConfiguration())
+              .unwrap(PhoenixConnection.class)) {
+              updateTrackerProgress(conn, partitionId, ownerPartitionId,
+                currentLastProcessedTimestamp, 
PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE);
+            }
+            reachedTerminalState = true;
             return;
           }
-          LOG.info("Partition {} owner {} for table {} fully processed, 
marking as COMPLETE",
-            partitionId, ownerPartitionId, dataTableName);
-          try (PhoenixConnection conn = 
QueryUtil.getConnectionOnServer(env.getConfiguration())
-            .unwrap(PhoenixConnection.class)) {
-            updateTrackerProgress(conn, partitionId, ownerPartitionId,
-              currentLastProcessedTimestamp, 
PhoenixDatabaseMetaData.TRACKER_STATUS_COMPLETE);
-          }
-          return;
+          currentLastProcessedTimestamp = newTimestamp;
+        } catch (SQLException | IOException e) {
+          metricSource.incrementCdcBatchFailureCount(dataTableName);
+          long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
+          LOG.warn(
+            "Error processing CDC batch for partition {} owner {} table {} "
+              + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms",
+            partitionId, ownerPartitionId, dataTableName, 
currentLastProcessedTimestamp, retryCount,
+            sleepTime, e);
+          sleepWithLagSampling(sleepTime);
         }
-        currentLastProcessedTimestamp = newTimestamp;
-      } catch (SQLException | IOException e) {
-        metricSource.incrementCdcBatchFailureCount(dataTableName);
-        long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
-        LOG.warn(
-          "Error processing CDC batch for partition {} owner {} table {} "
-            + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms",
-          partitionId, ownerPartitionId, dataTableName, 
currentLastProcessedTimestamp, retryCount,
-          sleepTime, e);
-        sleepWithLagSampling(sleepTime);
+      }
+      LOG.info("Processing partition {} (owner {}) stopped before completion 
for table {}",
+        partitionId, ownerPartitionId, dataTableName);
+    } finally {
+      if (reachedTerminalState) {
+        metricSource.updateCdcParentReplayDuration(dataTableName,
+          EnvironmentEdgeManager.currentTimeMillis() - replayStartTime);
       }
     }
-    LOG.info("Processing partition {} (owner {}) stopped before completion for 
table {}",
-      partitionId, ownerPartitionId, dataTableName);
   }
 
   /**
@@ -989,6 +1012,9 @@ public class IndexCDCConsumer implements Runnable {
           if (hasMoreRows) {
             newLastTimestamp = result.getFirst();
             if (batchMutations.isEmpty()) {
+              if (!isParentReplay) {
+                progress.recordProcessed(newLastTimestamp);
+              }
               sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, 
++retryCount));
             }
           }
@@ -1092,6 +1118,7 @@ public class IndexCDCConsumer implements Runnable {
       List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates = new 
ArrayList<>();
       long newLastTimestamp = lastProcessedTimestamp;
       long[] lastScannedTimestamp = { lastProcessedTimestamp };
+      long[] scannedRowCount = { 0L };
       boolean hasMoreRows = true;
       int retryCount = 0;
       // Captured immediately before each query so the empty-poll watermark 
cannot over-advance
@@ -1101,22 +1128,20 @@ public class IndexCDCConsumer implements Runnable {
         lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
         try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
           setStatementParams(scanInfo, partitionId, isParentReplay, 
newLastTimestamp, ps);
-          Pair<Long, Boolean> result =
-            getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, 
lastScannedTimestamp);
+          Pair<Long, Boolean> result = getDataRowStatesAndTimestamp(ps, 
newLastTimestamp,
+            batchStates, lastScannedTimestamp, scannedRowCount);
           hasMoreRows = result.getSecond();
           if (hasMoreRows) {
             if (!batchStates.isEmpty()) {
               newLastTimestamp = result.getFirst();
             } else if (retryCount >= maxDataVisibilityRetries) {
               LOG.warn(
-                "Skipping CDC events for table {} partition {} from timestamp 
{}"
+                "Skipping {} CDC events for table {} partition {} from 
timestamp {}"
                   + " to {} after {} retries — data table mutations may have 
failed",
-                dataTableName, partitionId, newLastTimestamp, 
lastScannedTimestamp[0], retryCount);
+                scannedRowCount[0], dataTableName, partitionId, 
newLastTimestamp,
+                lastScannedTimestamp[0], retryCount);
+              metricSource.incrementCdcEventSkippedCount(dataTableName, 
scannedRowCount[0]);
               newLastTimestamp = lastScannedTimestamp[0];
-              // NOTE: durable tracker advances below (newLastTimestamp > 
lastProcessedTimestamp)
-              // but progress.recordProcessed is skipped 
(batchStates.isEmpty()). The in-memory
-              // watermark lags durable state until the next empty poll heals 
it — over-reports
-              // lag temporarily (safe direction, self-healing).
               break;
             } else {
               // CDC index entries are written but the data is not yet visible.
@@ -1143,8 +1168,8 @@ public class IndexCDCConsumer implements Runnable {
           int idx = scanInfo.bindParams(ps, 1);
           ps.setString(idx++, partitionId);
           ps.setDate(idx, new Date(newLastTimestamp));
-          Pair<Long, Boolean> result =
-            getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, 
lastScannedTimestamp);
+          Pair<Long, Boolean> result = getDataRowStatesAndTimestamp(ps, 
newLastTimestamp,
+            batchStates, lastScannedTimestamp, scannedRowCount);
           newLastTimestamp = result.getFirst();
           if (batchStates.isEmpty()) {
             newLastTimestamp = timestampToRefetch;
@@ -1160,11 +1185,11 @@ public class IndexCDCConsumer implements Runnable {
         metricSource.updateCdcBatchProcessTime(dataTableName,
           EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
         metricSource.incrementCdcBatchCount(dataTableName);
+      }
+      if (newLastTimestamp > lastProcessedTimestamp) {
         if (!isParentReplay) {
           progress.recordProcessed(newLastTimestamp);
         }
-      }
-      if (newLastTimestamp > lastProcessedTimestamp) {
         updateTrackerProgress(conn, partitionId, ownerPartitionId, 
newLastTimestamp,
           PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
       }
@@ -1188,13 +1213,15 @@ public class IndexCDCConsumer implements Runnable {
 
   private static Pair<Long, Boolean> 
getDataRowStatesAndTimestamp(PreparedStatement ps,
     long initialLastTimestamp, List<Pair<Long, 
IndexMutationsProtos.DataRowStates>> batchStates,
-    long[] lastScannedTimestamp) throws SQLException, IOException {
+    long[] lastScannedTimestamp, long[] scannedRowCount) throws SQLException, 
IOException {
     boolean hasRows = false;
     long lastTimestamp = initialLastTimestamp;
     lastScannedTimestamp[0] = initialLastTimestamp;
+    scannedRowCount[0] = 0L;
     try (ResultSet rs = ps.executeQuery()) {
       while (rs.next()) {
         hasRows = true;
+        scannedRowCount[0]++;
         long rowTimestamp = rs.getDate(1).getTime();
         lastScannedTimestamp[0] = rowTimestamp;
         String cdcValue = rs.getString(2);

Reply via email to