This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/tmp-ec by this push:
     new 37955ca27d add metrics
37955ca27d is described below

commit 37955ca27d68f0caa549e4b84a7694b99f3d1349
Author: Viraj Jasani <[email protected]>
AuthorDate: Mon Apr 6 14:01:56 2026 -0700

    add metrics
---
 .../metrics/MetricsIndexCDCConsumerSource.java     |  92 +++++++++++++++++
 .../metrics/MetricsIndexCDCConsumerSourceImpl.java | 111 +++++++++++++++++++++
 .../index/metrics/MetricsIndexerSourceFactory.java |   8 ++
 .../phoenix/hbase/index/IndexCDCConsumer.java      |  36 +++++++
 .../ConcurrentMutationsExtendedIndexIT.java        |  55 +++++++++-
 5 files changed, 301 insertions(+), 1 deletion(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
new file mode 100644
index 0000000000..a8481b639d
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/**
+ * IndexCDCConsumer metrics for eventually consistent index updates.
+ */
+public interface MetricsIndexCDCConsumerSource extends BaseSource {
+
+  String METRICS_NAME = "IndexCDCConsumer";
+  String METRICS_CONTEXT = "phoenix";
+  String METRICS_DESCRIPTION = "Metrics about the Phoenix Index CDC Consumer";
+  String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+  String CDC_BATCH_PROCESS_TIME = "cdcBatchProcessTime";
+  String CDC_BATCH_PROCESS_TIME_DESC =
+    "Histogram for the end-to-end time in milliseconds for processing one CDC 
batch";
+
+  String CDC_MUTATION_GENERATE_TIME = "cdcMutationGenerateTime";
+  String CDC_MUTATION_GENERATE_TIME_DESC =
+    "Histogram for the time in milliseconds to generate index mutations from 
data row states";
+
+  String CDC_MUTATION_APPLY_TIME = "cdcMutationApplyTime";
+  String CDC_MUTATION_APPLY_TIME_DESC =
+    "Histogram for the time in milliseconds to apply index mutations to index 
tables";
+
+  String CDC_BATCH_COUNT = "cdcBatchCount";
+  String CDC_BATCH_COUNT_DESC = "The number of CDC batches processed";
+
+  String CDC_MUTATION_COUNT = "cdcMutationCount";
+  String CDC_MUTATION_COUNT_DESC = "The number of individual index mutations 
applied through CDC";
+
+  String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount";
+  String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing 
failures";
+
+  /**
+   * Updates the CDC batch processing time histogram.
+   * @param dataTableName physical data table name
+   * @param t             time taken in milliseconds
+   */
+  void updateCdcBatchProcessTime(String dataTableName, long t);
+
+  /**
+   * Updates the CDC mutation generation time histogram.
+   * @param dataTableName physical data table name
+   * @param t             time taken in milliseconds
+   */
+  void updateCdcMutationGenerateTime(String dataTableName, long t);
+
+  /**
+   * Updates the CDC mutation apply time histogram.
+   * @param dataTableName physical data table name
+   * @param t             time taken in milliseconds
+   */
+  void updateCdcMutationApplyTime(String dataTableName, long t);
+
+  /**
+   * Increments the CDC batch count.
+   * @param dataTableName physical data table name
+   */
+  void incrementCdcBatchCount(String dataTableName);
+
+  /**
+   * Increments the CDC mutation count by the given amount.
+   * @param dataTableName physical data table name
+   * @param count         number of mutations applied
+   */
+  void incrementCdcMutationCount(String dataTableName, long count);
+
+  /**
+   * Increments the CDC batch failure count.
+   * @param dataTableName physical data table name
+   */
+  void incrementCdcBatchFailureCount(String dataTableName);
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
new file mode 100644
index 0000000000..0b1400104e
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+
+/**
+ * Implementation for tracking IndexCDCConsumer metrics.
+ */
+public class MetricsIndexCDCConsumerSourceImpl extends BaseSourceImpl
+  implements MetricsIndexCDCConsumerSource {
+
+  private final MetricHistogram cdcBatchProcessTimeHisto;
+  private final MetricHistogram cdcMutationGenerateTimeHisto;
+  private final MetricHistogram cdcMutationApplyTimeHisto;
+  private final MutableFastCounter cdcBatchCounter;
+  private final MutableFastCounter cdcMutationCounter;
+  private final MutableFastCounter cdcBatchFailureCounter;
+
+  public MetricsIndexCDCConsumerSourceImpl() {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT);
+  }
+
+  public MetricsIndexCDCConsumerSourceImpl(String metricsName, String 
metricsDescription,
+    String metricsContext, String metricsJmxContext) {
+    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+    cdcBatchProcessTimeHisto =
+      getMetricsRegistry().newHistogram(CDC_BATCH_PROCESS_TIME, 
CDC_BATCH_PROCESS_TIME_DESC);
+    cdcMutationGenerateTimeHisto = 
getMetricsRegistry().newHistogram(CDC_MUTATION_GENERATE_TIME,
+      CDC_MUTATION_GENERATE_TIME_DESC);
+    cdcMutationApplyTimeHisto =
+      getMetricsRegistry().newHistogram(CDC_MUTATION_APPLY_TIME, 
CDC_MUTATION_APPLY_TIME_DESC);
+    cdcBatchCounter = getMetricsRegistry().newCounter(CDC_BATCH_COUNT, 
CDC_BATCH_COUNT_DESC, 0L);
+    cdcMutationCounter =
+      getMetricsRegistry().newCounter(CDC_MUTATION_COUNT, 
CDC_MUTATION_COUNT_DESC, 0L);
+    cdcBatchFailureCounter =
+      getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT, 
CDC_BATCH_FAILURE_COUNT_DESC, 0L);
+  }
+
+  @Override
+  public void updateCdcBatchProcessTime(String dataTableName, long t) {
+    incrementTableSpecificHistogram(CDC_BATCH_PROCESS_TIME, dataTableName, t);
+    cdcBatchProcessTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateCdcMutationGenerateTime(String dataTableName, long t) {
+    incrementTableSpecificHistogram(CDC_MUTATION_GENERATE_TIME, dataTableName, 
t);
+    cdcMutationGenerateTimeHisto.add(t);
+  }
+
+  @Override
+  public void updateCdcMutationApplyTime(String dataTableName, long t) {
+    incrementTableSpecificHistogram(CDC_MUTATION_APPLY_TIME, dataTableName, t);
+    cdcMutationApplyTimeHisto.add(t);
+  }
+
+  @Override
+  public void incrementCdcBatchCount(String dataTableName) {
+    incrementTableSpecificCounter(CDC_BATCH_COUNT, dataTableName);
+    cdcBatchCounter.incr();
+  }
+
+  @Override
+  public void incrementCdcMutationCount(String dataTableName, long count) {
+    MutableFastCounter tableCounter =
+      getMetricsRegistry().getCounter(getMetricName(CDC_MUTATION_COUNT, 
dataTableName), 0);
+    tableCounter.incr(count);
+    cdcMutationCounter.incr(count);
+  }
+
+  @Override
+  public void incrementCdcBatchFailureCount(String dataTableName) {
+    incrementTableSpecificCounter(CDC_BATCH_FAILURE_COUNT, dataTableName);
+    cdcBatchFailureCounter.incr();
+  }
+
+  private void incrementTableSpecificCounter(String baseName, String 
tableName) {
+    MutableFastCounter tableCounter =
+      getMetricsRegistry().getCounter(getMetricName(baseName, tableName), 0);
+    tableCounter.incr();
+  }
+
+  private void incrementTableSpecificHistogram(String baseName, String 
tableName, long t) {
+    MetricHistogram tableHistogram =
+      getMetricsRegistry().getHistogram(getMetricName(baseName, tableName));
+    tableHistogram.add(t);
+  }
+
+  private String getMetricName(String baseName, String tableName) {
+    return baseName + "." + tableName;
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
index e579b2f836..55b648c53a 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
@@ -24,6 +24,7 @@ public class MetricsIndexerSourceFactory {
   private static final MetricsIndexerSourceFactory INSTANCE = new 
MetricsIndexerSourceFactory();
   private volatile MetricsIndexerSource indexerSource;
   private GlobalIndexCheckerSource globalIndexCheckerSource;
+  private MetricsIndexCDCConsumerSource indexCDCConsumerSource;
 
   private MetricsIndexerSourceFactory() {
   }
@@ -45,4 +46,11 @@ public class MetricsIndexerSourceFactory {
     }
     return INSTANCE.globalIndexCheckerSource;
   }
+
+  public synchronized MetricsIndexCDCConsumerSource 
getIndexCDCConsumerSource() {
+    if (INSTANCE.indexCDCConsumerSource == null) {
+      INSTANCE.indexCDCConsumerSource = new 
MetricsIndexCDCConsumerSourceImpl();
+    }
+    return INSTANCE.indexCDCConsumerSource;
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index dc49a77ea3..1ed4e5f26d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.generated.IndexMutationsProtos;
 import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
@@ -150,6 +152,7 @@ public class IndexCDCConsumer implements Runnable {
   private final long parentProgressPauseMs;
   private final Configuration config;
   private final boolean serializeCDCMutations;
+  private final MetricsIndexCDCConsumerSource metricSource;
   private volatile boolean stopped = false;
   private Thread consumerThread;
   private boolean hasParentPartitions = false;
@@ -224,6 +227,7 @@ public class IndexCDCConsumer implements Runnable {
       DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
     this.parentProgressPauseMs =
       config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, 
DEFAULT_PARENT_PROGRESS_PAUSE_MS);
+    this.metricSource = 
MetricsIndexerSourceFactory.getInstance().getIndexCDCConsumerSource();
     DelegateRegionCoprocessorEnvironment indexWriterEnv =
       new DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
     this.indexWriter =
@@ -467,6 +471,7 @@ public class IndexCDCConsumer implements Runnable {
           if (e instanceof InterruptedException) {
             throw (InterruptedException) e;
           }
+          metricSource.incrementCdcBatchFailureCount(dataTableName);
           long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
           LOG.error(
             "Error processing CDC mutations for table {} region {}. "
@@ -838,6 +843,7 @@ public class IndexCDCConsumer implements Runnable {
         }
         currentLastProcessedTimestamp = newTimestamp;
       } catch (SQLException | IOException e) {
+        metricSource.incrementCdcBatchFailureCount(dataTableName);
         long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
         LOG.warn(
           "Error processing CDC batch for partition {} owner {} table {} "
@@ -904,6 +910,7 @@ public class IndexCDCConsumer implements Runnable {
   private long processCDCBatch(String partitionId, String ownerPartitionId,
     long lastProcessedTimestamp, boolean isParentReplay)
     throws SQLException, IOException, InterruptedException {
+    long batchStartTime = EnvironmentEdgeManager.currentTimeMillis();
     LOG.debug("Processing CDC batch for table {} partition {} owner {} from 
timestamp {}",
       dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
     try (PhoenixConnection conn =
@@ -969,6 +976,9 @@ public class IndexCDCConsumer implements Runnable {
       }
       executeIndexMutations(partitionId, batchMutations, ownerPartitionId, 
newLastTimestamp);
       if (!batchMutations.isEmpty()) {
+        metricSource.updateCdcBatchProcessTime(dataTableName,
+          EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
+        metricSource.incrementCdcBatchCount(dataTableName);
         updateTrackerProgress(conn, partitionId, ownerPartitionId, 
newLastTimestamp,
           PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
       }
@@ -1011,6 +1021,7 @@ public class IndexCDCConsumer implements Runnable {
   private long processCDCBatchGenerated(String partitionId, String 
ownerPartitionId,
     long lastProcessedTimestamp, boolean isParentReplay)
     throws SQLException, IOException, InterruptedException {
+    long batchStartTime = EnvironmentEdgeManager.currentTimeMillis();
     LOG.debug(
       "Processing CDC batch (generated mode) for table {} partition {} owner 
{} from timestamp {}",
       dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
@@ -1090,6 +1101,11 @@ public class IndexCDCConsumer implements Runnable {
       }
       generateAndApplyIndexMutations(conn, batchStates, partitionId, 
ownerPartitionId,
         newLastTimestamp);
+      if (!batchStates.isEmpty()) {
+        metricSource.updateCdcBatchProcessTime(dataTableName,
+          EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
+        metricSource.incrementCdcBatchCount(dataTableName);
+      }
       if (newLastTimestamp > lastProcessedTimestamp) {
         updateTrackerProgress(conn, partitionId, ownerPartitionId, 
newLastTimestamp,
           PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
@@ -1165,6 +1181,7 @@ public class IndexCDCConsumer implements Runnable {
     }
     ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = 
ArrayListMultimap.create();
     int totalMutations = 0;
+    long generateStartTime = EnvironmentEdgeManager.currentTimeMillis();
     for (Pair<Long, IndexMutationsProtos.DataRowStates> entry : batchStates) {
       long ts = entry.getFirst();
       IndexMutationsProtos.DataRowStates dataRowStates = entry.getSecond();
@@ -1196,16 +1213,28 @@ public class IndexCDCConsumer implements Runnable {
         nextDataRowState, ts, encodedRegionNameBytes, 
QueryConstants.VERIFIED_BYTES, indexTables,
         indexUpdates);
       if (indexUpdates.size() >= batchSize) {
+        metricSource.updateCdcMutationGenerateTime(dataTableName,
+          EnvironmentEdgeManager.currentTimeMillis() - generateStartTime);
+        long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
         indexWriter.write(indexUpdates, false, 
MetaDataProtocol.PHOENIX_VERSION);
+        metricSource.updateCdcMutationApplyTime(dataTableName,
+          EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
         totalMutations += indexUpdates.size();
         indexUpdates.clear();
+        generateStartTime = EnvironmentEdgeManager.currentTimeMillis();
       }
     }
     if (!indexUpdates.isEmpty()) {
+      metricSource.updateCdcMutationGenerateTime(dataTableName,
+        EnvironmentEdgeManager.currentTimeMillis() - generateStartTime);
+      long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
       indexWriter.write(indexUpdates, false, MetaDataProtocol.PHOENIX_VERSION);
+      metricSource.updateCdcMutationApplyTime(dataTableName,
+        EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
       totalMutations += indexUpdates.size();
     }
     if (totalMutations > 0) {
+      metricSource.incrementCdcMutationCount(dataTableName, totalMutations);
       LOG.debug(
         "Applied total {} index mutations for table {} partition {} owner {} "
           + ", last processed timestamp {}",
@@ -1239,16 +1268,23 @@ public class IndexCDCConsumer implements Runnable {
           indexUpdates.put(tableRef, mutation);
         }
         if (indexUpdates.size() >= batchSize) {
+          long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
           indexWriter.write(indexUpdates, false, 
MetaDataProtocol.PHOENIX_VERSION);
+          metricSource.updateCdcMutationApplyTime(dataTableName,
+            EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
           totalMutations += indexUpdates.size();
           indexUpdates.clear();
         }
       }
       if (!indexUpdates.isEmpty()) {
+        long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
         indexWriter.write(indexUpdates, false, 
MetaDataProtocol.PHOENIX_VERSION);
+        metricSource.updateCdcMutationApplyTime(dataTableName,
+          EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
         totalMutations += indexUpdates.size();
       }
       if (totalMutations > 0) {
+        metricSource.incrementCdcMutationCount(dataTableName, totalMutations);
         LOG.debug(
           "Applied total {} index mutations for table {} partition {} owner {} 
"
             + ", last processed timestamp {}",
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
index 1244b8d3e0..54ad81e0f3 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
@@ -18,6 +18,9 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
+import static 
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_BATCH_COUNT;
+import static 
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_BATCH_PROCESS_TIME;
+import static 
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_MUTATION_COUNT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -29,10 +32,14 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import 
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSourceImpl;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Assume;
@@ -66,6 +73,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
   // This test is heavy and it might exhaust jenkins resources
   @Test(timeout = 1800000)
   public void testConcurrentUpsertsWithTableSplits() throws Exception {
+    Map<String, Long> metricsBefore = getCdcMetricValues();
     int nThreads = 12;
     final int batchSize = 100;
     final int nRows = 777;
@@ -131,6 +139,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     List<String> allIndexes =
       new ArrayList<>(Arrays.asList(indexName1, indexName2, indexName3, 
indexName4, indexName5));
     verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
+    assertCdcMetrics(metricsBefore);
   }
 
   private void verifyRandomIndexes(List<String> allIndexes, String schemaName, 
String tableName,
@@ -176,6 +185,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
   @Ignore("too aggressive for jenkins builds")
   public void testConcurrentUpsertsWithTableSplitsMerges() throws Exception {
     Assume.assumeFalse(uncovered);
+    Map<String, Long> metricsBefore = getCdcMetricValues();
     int nThreads = 13;
     final int batchSize = 100;
     final int nRows = 1451;
@@ -250,11 +260,13 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     List<String> allIndexes = new ArrayList<>(
       Arrays.asList(indexName1, indexName2, indexName3, indexName4, 
indexName5, indexName6));
     verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
+    assertCdcMetrics(metricsBefore);
   }
 
   @Test(timeout = 1800000)
   public void testConcurrentMutationsWithNonIndexedColumnUpdates() throws 
Exception {
     Assume.assumeFalse(uncovered);
+    Map<String, Long> metricsBefore = getCdcMetricValues();
     final int nThreads = 3;
     final int batchSize = 100;
     final int nRows = 500;
@@ -275,6 +287,9 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     conn.createStatement().execute("CREATE UNCOVERED INDEX " + 
uncoveredIndexName + " ON "
       + tableName + "(v3)" + (eventual ? " CONSISTENCY = EVENTUAL" : ""));
 
+    runMutationPhase(tableName, nThreads, 15000, batchSize, nRows, 
nIndexValues, false);
+    Thread.sleep(5000);
+
     runMutationPhase(tableName, nThreads, 13334, batchSize, nRows, 
nIndexValues, true);
     Thread.sleep(5000);
 
@@ -282,7 +297,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     Thread.sleep(5000);
 
     runMutationPhase(tableName, nThreads, 15000, batchSize, nRows, 
nIndexValues, true);
-    Thread.sleep(15000);
+    Thread.sleep(20000);
 
     if (eventual) {
       boolean verified = false;
@@ -307,6 +322,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
       long uncoveredCount = verifyIndexTable(tableName, uncoveredIndexName, 
conn);
       assertEquals(nRows, uncoveredCount);
     }
+    assertCdcMetrics(metricsBefore);
   }
 
   private void runMutationPhase(String tableName, int nThreads, int 
mutationsPerThread,
@@ -353,6 +369,43 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     return rand.nextBoolean() ? "null" : Integer.toString(rand.nextInt() % 
bound);
   }
 
+  private Map<String, Long> getCdcMetricValues() {
+    MetricsIndexCDCConsumerSourceImpl source =
+      (MetricsIndexCDCConsumerSourceImpl) 
MetricsIndexerSourceFactory.getInstance()
+        .getIndexCDCConsumerSource();
+    Map<String, Long> values = new HashMap<>();
+    values.put(CDC_BATCH_COUNT, 
source.getMetricsRegistry().getCounter(CDC_BATCH_COUNT, 0).value());
+    values.put(CDC_MUTATION_COUNT,
+      source.getMetricsRegistry().getCounter(CDC_MUTATION_COUNT, 0).value());
+    values.put(CDC_BATCH_PROCESS_TIME,
+      
source.getMetricsRegistry().getHistogram(CDC_BATCH_PROCESS_TIME).getCount());
+    return values;
+  }
+
+  private void assertCdcMetrics(Map<String, Long> before) {
+    Map<String, Long> after = getCdcMetricValues();
+    long batchDelta = after.get(CDC_BATCH_COUNT) - before.get(CDC_BATCH_COUNT);
+    long mutationDelta = after.get(CDC_MUTATION_COUNT) - 
before.get(CDC_MUTATION_COUNT);
+    long histogramDelta = after.get(CDC_BATCH_PROCESS_TIME) - 
before.get(CDC_BATCH_PROCESS_TIME);
+    LOGGER.info(
+      "CDC metrics — before: {}, after: {}, delta — batchCount: {}, "
+        + "mutationCount: {}, batchProcessTime samples: {}",
+      before, after, batchDelta, mutationDelta, histogramDelta);
+    if (eventual) {
+      assertTrue("Expected cdcBatchCount to increase by at least 1, got " + 
batchDelta,
+        batchDelta >= 1);
+      assertTrue("Expected cdcMutationCount to increase by at least 1000, got 
" + mutationDelta,
+        mutationDelta >= 1000);
+      assertTrue("Expected cdcBatchProcessTime histogram samples to increase 
by at least 1, got "
+        + histogramDelta, histogramDelta >= 1);
+    } else {
+      assertEquals("Expected no CDC batch processing for synchronous indexes", 
0, batchDelta);
+      assertEquals("Expected no CDC mutations for synchronous indexes", 0, 
mutationDelta);
+      assertEquals("Expected no CDC batch process time samples for synchronous 
indexes", 0,
+        histogramDelta);
+    }
+  }
+
   private void dumpTableRows(Connection conn, String tableName) throws 
SQLException {
     LOGGER.info("Dumping {} table:", tableName);
     try (ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + 
tableName)) {

Reply via email to