This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/tmp-ec by this push:
new 37955ca27d add metrics
37955ca27d is described below
commit 37955ca27d68f0caa549e4b84a7694b99f3d1349
Author: Viraj Jasani <[email protected]>
AuthorDate: Mon Apr 6 14:01:56 2026 -0700
add metrics
---
.../metrics/MetricsIndexCDCConsumerSource.java | 92 +++++++++++++++++
.../metrics/MetricsIndexCDCConsumerSourceImpl.java | 111 +++++++++++++++++++++
.../index/metrics/MetricsIndexerSourceFactory.java | 8 ++
.../phoenix/hbase/index/IndexCDCConsumer.java | 36 +++++++
.../ConcurrentMutationsExtendedIndexIT.java | 55 +++++++++-
5 files changed, 301 insertions(+), 1 deletion(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
new file mode 100644
index 0000000000..a8481b639d
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/**
+ * IndexCDCConsumer metrics for eventually consistent index updates.
+ */
+public interface MetricsIndexCDCConsumerSource extends BaseSource {
+
+ String METRICS_NAME = "IndexCDCConsumer";
+ String METRICS_CONTEXT = "phoenix";
+ String METRICS_DESCRIPTION = "Metrics about the Phoenix Index CDC Consumer";
+ String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+ String CDC_BATCH_PROCESS_TIME = "cdcBatchProcessTime";
+ String CDC_BATCH_PROCESS_TIME_DESC =
+ "Histogram for the end-to-end time in milliseconds for processing one CDC
batch";
+
+ String CDC_MUTATION_GENERATE_TIME = "cdcMutationGenerateTime";
+ String CDC_MUTATION_GENERATE_TIME_DESC =
+ "Histogram for the time in milliseconds to generate index mutations from
data row states";
+
+ String CDC_MUTATION_APPLY_TIME = "cdcMutationApplyTime";
+ String CDC_MUTATION_APPLY_TIME_DESC =
+ "Histogram for the time in milliseconds to apply index mutations to index
tables";
+
+ String CDC_BATCH_COUNT = "cdcBatchCount";
+ String CDC_BATCH_COUNT_DESC = "The number of CDC batches processed";
+
+ String CDC_MUTATION_COUNT = "cdcMutationCount";
+ String CDC_MUTATION_COUNT_DESC = "The number of individual index mutations
applied through CDC";
+
+ String CDC_BATCH_FAILURE_COUNT = "cdcBatchFailureCount";
+ String CDC_BATCH_FAILURE_COUNT_DESC = "The number of CDC batch processing
failures";
+
+ /**
+ * Updates the CDC batch processing time histogram.
+ * @param dataTableName physical data table name
+ * @param t time taken in milliseconds
+ */
+ void updateCdcBatchProcessTime(String dataTableName, long t);
+
+ /**
+ * Updates the CDC mutation generation time histogram.
+ * @param dataTableName physical data table name
+ * @param t time taken in milliseconds
+ */
+ void updateCdcMutationGenerateTime(String dataTableName, long t);
+
+ /**
+ * Updates the CDC mutation apply time histogram.
+ * @param dataTableName physical data table name
+ * @param t time taken in milliseconds
+ */
+ void updateCdcMutationApplyTime(String dataTableName, long t);
+
+ /**
+ * Increments the CDC batch count.
+ * @param dataTableName physical data table name
+ */
+ void incrementCdcBatchCount(String dataTableName);
+
+ /**
+ * Increments the CDC mutation count by the given amount.
+ * @param dataTableName physical data table name
+ * @param count number of mutations applied
+ */
+ void incrementCdcMutationCount(String dataTableName, long count);
+
+ /**
+ * Increments the CDC batch failure count.
+ * @param dataTableName physical data table name
+ */
+ void incrementCdcBatchFailureCount(String dataTableName);
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
new file mode 100644
index 0000000000..0b1400104e
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSourceImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+
+/**
+ * Implementation for tracking IndexCDCConsumer metrics.
+ */
+public class MetricsIndexCDCConsumerSourceImpl extends BaseSourceImpl
+ implements MetricsIndexCDCConsumerSource {
+
+ private final MetricHistogram cdcBatchProcessTimeHisto;
+ private final MetricHistogram cdcMutationGenerateTimeHisto;
+ private final MetricHistogram cdcMutationApplyTimeHisto;
+ private final MutableFastCounter cdcBatchCounter;
+ private final MutableFastCounter cdcMutationCounter;
+ private final MutableFastCounter cdcBatchFailureCounter;
+
+ public MetricsIndexCDCConsumerSourceImpl() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT);
+ }
+
+ public MetricsIndexCDCConsumerSourceImpl(String metricsName, String
metricsDescription,
+ String metricsContext, String metricsJmxContext) {
+ super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+ cdcBatchProcessTimeHisto =
+ getMetricsRegistry().newHistogram(CDC_BATCH_PROCESS_TIME,
CDC_BATCH_PROCESS_TIME_DESC);
+ cdcMutationGenerateTimeHisto =
getMetricsRegistry().newHistogram(CDC_MUTATION_GENERATE_TIME,
+ CDC_MUTATION_GENERATE_TIME_DESC);
+ cdcMutationApplyTimeHisto =
+ getMetricsRegistry().newHistogram(CDC_MUTATION_APPLY_TIME,
CDC_MUTATION_APPLY_TIME_DESC);
+ cdcBatchCounter = getMetricsRegistry().newCounter(CDC_BATCH_COUNT,
CDC_BATCH_COUNT_DESC, 0L);
+ cdcMutationCounter =
+ getMetricsRegistry().newCounter(CDC_MUTATION_COUNT,
CDC_MUTATION_COUNT_DESC, 0L);
+ cdcBatchFailureCounter =
+ getMetricsRegistry().newCounter(CDC_BATCH_FAILURE_COUNT,
CDC_BATCH_FAILURE_COUNT_DESC, 0L);
+ }
+
+ @Override
+ public void updateCdcBatchProcessTime(String dataTableName, long t) {
+ incrementTableSpecificHistogram(CDC_BATCH_PROCESS_TIME, dataTableName, t);
+ cdcBatchProcessTimeHisto.add(t);
+ }
+
+ @Override
+ public void updateCdcMutationGenerateTime(String dataTableName, long t) {
+ incrementTableSpecificHistogram(CDC_MUTATION_GENERATE_TIME, dataTableName,
t);
+ cdcMutationGenerateTimeHisto.add(t);
+ }
+
+ @Override
+ public void updateCdcMutationApplyTime(String dataTableName, long t) {
+ incrementTableSpecificHistogram(CDC_MUTATION_APPLY_TIME, dataTableName, t);
+ cdcMutationApplyTimeHisto.add(t);
+ }
+
+ @Override
+ public void incrementCdcBatchCount(String dataTableName) {
+ incrementTableSpecificCounter(CDC_BATCH_COUNT, dataTableName);
+ cdcBatchCounter.incr();
+ }
+
+ @Override
+ public void incrementCdcMutationCount(String dataTableName, long count) {
+ MutableFastCounter tableCounter =
+ getMetricsRegistry().getCounter(getMetricName(CDC_MUTATION_COUNT,
dataTableName), 0);
+ tableCounter.incr(count);
+ cdcMutationCounter.incr(count);
+ }
+
+ @Override
+ public void incrementCdcBatchFailureCount(String dataTableName) {
+ incrementTableSpecificCounter(CDC_BATCH_FAILURE_COUNT, dataTableName);
+ cdcBatchFailureCounter.incr();
+ }
+
+ private void incrementTableSpecificCounter(String baseName, String
tableName) {
+ MutableFastCounter tableCounter =
+ getMetricsRegistry().getCounter(getMetricName(baseName, tableName), 0);
+ tableCounter.incr();
+ }
+
+ private void incrementTableSpecificHistogram(String baseName, String
tableName, long t) {
+ MetricHistogram tableHistogram =
+ getMetricsRegistry().getHistogram(getMetricName(baseName, tableName));
+ tableHistogram.add(t);
+ }
+
+ private String getMetricName(String baseName, String tableName) {
+ return baseName + "." + tableName;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
index e579b2f836..55b648c53a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java
@@ -24,6 +24,7 @@ public class MetricsIndexerSourceFactory {
private static final MetricsIndexerSourceFactory INSTANCE = new
MetricsIndexerSourceFactory();
private volatile MetricsIndexerSource indexerSource;
private GlobalIndexCheckerSource globalIndexCheckerSource;
+ private MetricsIndexCDCConsumerSource indexCDCConsumerSource;
private MetricsIndexerSourceFactory() {
}
@@ -45,4 +46,11 @@ public class MetricsIndexerSourceFactory {
}
return INSTANCE.globalIndexCheckerSource;
}
+
+ public synchronized MetricsIndexCDCConsumerSource
getIndexCDCConsumerSource() {
+ if (INSTANCE.indexCDCConsumerSource == null) {
+ INSTANCE.indexCDCConsumerSource = new
MetricsIndexCDCConsumerSourceImpl();
+ }
+ return INSTANCE.indexCDCConsumerSource;
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index dc49a77ea3..1ed4e5f26d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
import org.apache.phoenix.coprocessor.generated.IndexMutationsProtos;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
@@ -150,6 +152,7 @@ public class IndexCDCConsumer implements Runnable {
private final long parentProgressPauseMs;
private final Configuration config;
private final boolean serializeCDCMutations;
+ private final MetricsIndexCDCConsumerSource metricSource;
private volatile boolean stopped = false;
private Thread consumerThread;
private boolean hasParentPartitions = false;
@@ -224,6 +227,7 @@ public class IndexCDCConsumer implements Runnable {
DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
this.parentProgressPauseMs =
config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS,
DEFAULT_PARENT_PROGRESS_PAUSE_MS);
+ this.metricSource =
MetricsIndexerSourceFactory.getInstance().getIndexCDCConsumerSource();
DelegateRegionCoprocessorEnvironment indexWriterEnv =
new DelegateRegionCoprocessorEnvironment(env,
ConnectionType.INDEX_WRITER_CONNECTION);
this.indexWriter =
@@ -467,6 +471,7 @@ public class IndexCDCConsumer implements Runnable {
if (e instanceof InterruptedException) {
throw (InterruptedException) e;
}
+ metricSource.incrementCdcBatchFailureCount(dataTableName);
long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
LOG.error(
"Error processing CDC mutations for table {} region {}. "
@@ -838,6 +843,7 @@ public class IndexCDCConsumer implements Runnable {
}
currentLastProcessedTimestamp = newTimestamp;
} catch (SQLException | IOException e) {
+ metricSource.incrementCdcBatchFailureCount(dataTableName);
long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
LOG.warn(
"Error processing CDC batch for partition {} owner {} table {} "
@@ -904,6 +910,7 @@ public class IndexCDCConsumer implements Runnable {
private long processCDCBatch(String partitionId, String ownerPartitionId,
long lastProcessedTimestamp, boolean isParentReplay)
throws SQLException, IOException, InterruptedException {
+ long batchStartTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.debug("Processing CDC batch for table {} partition {} owner {} from
timestamp {}",
dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
try (PhoenixConnection conn =
@@ -969,6 +976,9 @@ public class IndexCDCConsumer implements Runnable {
}
executeIndexMutations(partitionId, batchMutations, ownerPartitionId,
newLastTimestamp);
if (!batchMutations.isEmpty()) {
+ metricSource.updateCdcBatchProcessTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
+ metricSource.incrementCdcBatchCount(dataTableName);
updateTrackerProgress(conn, partitionId, ownerPartitionId,
newLastTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
}
@@ -1011,6 +1021,7 @@ public class IndexCDCConsumer implements Runnable {
private long processCDCBatchGenerated(String partitionId, String
ownerPartitionId,
long lastProcessedTimestamp, boolean isParentReplay)
throws SQLException, IOException, InterruptedException {
+ long batchStartTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.debug(
"Processing CDC batch (generated mode) for table {} partition {} owner
{} from timestamp {}",
dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
@@ -1090,6 +1101,11 @@ public class IndexCDCConsumer implements Runnable {
}
generateAndApplyIndexMutations(conn, batchStates, partitionId,
ownerPartitionId,
newLastTimestamp);
+ if (!batchStates.isEmpty()) {
+ metricSource.updateCdcBatchProcessTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
+ metricSource.incrementCdcBatchCount(dataTableName);
+ }
if (newLastTimestamp > lastProcessedTimestamp) {
updateTrackerProgress(conn, partitionId, ownerPartitionId,
newLastTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
@@ -1165,6 +1181,7 @@ public class IndexCDCConsumer implements Runnable {
}
ListMultimap<HTableInterfaceReference, Mutation> indexUpdates =
ArrayListMultimap.create();
int totalMutations = 0;
+ long generateStartTime = EnvironmentEdgeManager.currentTimeMillis();
for (Pair<Long, IndexMutationsProtos.DataRowStates> entry : batchStates) {
long ts = entry.getFirst();
IndexMutationsProtos.DataRowStates dataRowStates = entry.getSecond();
@@ -1196,16 +1213,28 @@ public class IndexCDCConsumer implements Runnable {
nextDataRowState, ts, encodedRegionNameBytes,
QueryConstants.VERIFIED_BYTES, indexTables,
indexUpdates);
if (indexUpdates.size() >= batchSize) {
+ metricSource.updateCdcMutationGenerateTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - generateStartTime);
+ long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
indexWriter.write(indexUpdates, false,
MetaDataProtocol.PHOENIX_VERSION);
+ metricSource.updateCdcMutationApplyTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
totalMutations += indexUpdates.size();
indexUpdates.clear();
+ generateStartTime = EnvironmentEdgeManager.currentTimeMillis();
}
}
if (!indexUpdates.isEmpty()) {
+ metricSource.updateCdcMutationGenerateTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - generateStartTime);
+ long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
indexWriter.write(indexUpdates, false, MetaDataProtocol.PHOENIX_VERSION);
+ metricSource.updateCdcMutationApplyTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
totalMutations += indexUpdates.size();
}
if (totalMutations > 0) {
+ metricSource.incrementCdcMutationCount(dataTableName, totalMutations);
LOG.debug(
"Applied total {} index mutations for table {} partition {} owner {} "
+ ", last processed timestamp {}",
@@ -1239,16 +1268,23 @@ public class IndexCDCConsumer implements Runnable {
indexUpdates.put(tableRef, mutation);
}
if (indexUpdates.size() >= batchSize) {
+ long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
indexWriter.write(indexUpdates, false,
MetaDataProtocol.PHOENIX_VERSION);
+ metricSource.updateCdcMutationApplyTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
totalMutations += indexUpdates.size();
indexUpdates.clear();
}
}
if (!indexUpdates.isEmpty()) {
+ long applyStartTime = EnvironmentEdgeManager.currentTimeMillis();
indexWriter.write(indexUpdates, false,
MetaDataProtocol.PHOENIX_VERSION);
+ metricSource.updateCdcMutationApplyTime(dataTableName,
+ EnvironmentEdgeManager.currentTimeMillis() - applyStartTime);
totalMutations += indexUpdates.size();
}
if (totalMutations > 0) {
+ metricSource.incrementCdcMutationCount(dataTableName, totalMutations);
LOG.debug(
"Applied total {} index mutations for table {} partition {} owner {}
"
+ ", last processed timestamp {}",
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
index 1244b8d3e0..54ad81e0f3 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
@@ -18,6 +18,9 @@
package org.apache.phoenix.end2end;
import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
+import static
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_BATCH_COUNT;
+import static
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_BATCH_PROCESS_TIME;
+import static
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_MUTATION_COUNT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -29,10 +32,14 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import
org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSourceImpl;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assume;
@@ -66,6 +73,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
// This test is heavy and it might exhaust jenkins resources
@Test(timeout = 1800000)
public void testConcurrentUpsertsWithTableSplits() throws Exception {
+ Map<String, Long> metricsBefore = getCdcMetricValues();
int nThreads = 12;
final int batchSize = 100;
final int nRows = 777;
@@ -131,6 +139,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
List<String> allIndexes =
new ArrayList<>(Arrays.asList(indexName1, indexName2, indexName3,
indexName4, indexName5));
verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
+ assertCdcMetrics(metricsBefore);
}
private void verifyRandomIndexes(List<String> allIndexes, String schemaName,
String tableName,
@@ -176,6 +185,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
@Ignore("too aggressive for jenkins builds")
public void testConcurrentUpsertsWithTableSplitsMerges() throws Exception {
Assume.assumeFalse(uncovered);
+ Map<String, Long> metricsBefore = getCdcMetricValues();
int nThreads = 13;
final int batchSize = 100;
final int nRows = 1451;
@@ -250,11 +260,13 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
List<String> allIndexes = new ArrayList<>(
Arrays.asList(indexName1, indexName2, indexName3, indexName4,
indexName5, indexName6));
verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
+ assertCdcMetrics(metricsBefore);
}
@Test(timeout = 1800000)
public void testConcurrentMutationsWithNonIndexedColumnUpdates() throws
Exception {
Assume.assumeFalse(uncovered);
+ Map<String, Long> metricsBefore = getCdcMetricValues();
final int nThreads = 3;
final int batchSize = 100;
final int nRows = 500;
@@ -275,6 +287,9 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
conn.createStatement().execute("CREATE UNCOVERED INDEX " +
uncoveredIndexName + " ON "
+ tableName + "(v3)" + (eventual ? " CONSISTENCY = EVENTUAL" : ""));
+ runMutationPhase(tableName, nThreads, 15000, batchSize, nRows,
nIndexValues, false);
+ Thread.sleep(5000);
+
runMutationPhase(tableName, nThreads, 13334, batchSize, nRows,
nIndexValues, true);
Thread.sleep(5000);
@@ -282,7 +297,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
Thread.sleep(5000);
runMutationPhase(tableName, nThreads, 15000, batchSize, nRows,
nIndexValues, true);
- Thread.sleep(15000);
+ Thread.sleep(20000);
if (eventual) {
boolean verified = false;
@@ -307,6 +322,7 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
long uncoveredCount = verifyIndexTable(tableName, uncoveredIndexName,
conn);
assertEquals(nRows, uncoveredCount);
}
+ assertCdcMetrics(metricsBefore);
}
private void runMutationPhase(String tableName, int nThreads, int
mutationsPerThread,
@@ -353,6 +369,43 @@ public abstract class ConcurrentMutationsExtendedIndexIT
extends ParallelStatsDi
return rand.nextBoolean() ? "null" : Integer.toString(rand.nextInt() %
bound);
}
+ private Map<String, Long> getCdcMetricValues() {
+ MetricsIndexCDCConsumerSourceImpl source =
+ (MetricsIndexCDCConsumerSourceImpl)
MetricsIndexerSourceFactory.getInstance()
+ .getIndexCDCConsumerSource();
+ Map<String, Long> values = new HashMap<>();
+ values.put(CDC_BATCH_COUNT,
source.getMetricsRegistry().getCounter(CDC_BATCH_COUNT, 0).value());
+ values.put(CDC_MUTATION_COUNT,
+ source.getMetricsRegistry().getCounter(CDC_MUTATION_COUNT, 0).value());
+ values.put(CDC_BATCH_PROCESS_TIME,
+
source.getMetricsRegistry().getHistogram(CDC_BATCH_PROCESS_TIME).getCount());
+ return values;
+ }
+
+ private void assertCdcMetrics(Map<String, Long> before) {
+ Map<String, Long> after = getCdcMetricValues();
+ long batchDelta = after.get(CDC_BATCH_COUNT) - before.get(CDC_BATCH_COUNT);
+ long mutationDelta = after.get(CDC_MUTATION_COUNT) -
before.get(CDC_MUTATION_COUNT);
+ long histogramDelta = after.get(CDC_BATCH_PROCESS_TIME) -
before.get(CDC_BATCH_PROCESS_TIME);
+ LOGGER.info(
+ "CDC metrics — before: {}, after: {}, delta — batchCount: {}, "
+ + "mutationCount: {}, batchProcessTime samples: {}",
+ before, after, batchDelta, mutationDelta, histogramDelta);
+ if (eventual) {
+ assertTrue("Expected cdcBatchCount to increase by at least 1, got " +
batchDelta,
+ batchDelta >= 1);
+ assertTrue("Expected cdcMutationCount to increase by at least 1000, got
" + mutationDelta,
+ mutationDelta >= 1000);
+ assertTrue("Expected cdcBatchProcessTime histogram samples to increase
by at least 1, got "
+ + histogramDelta, histogramDelta >= 1);
+ } else {
+ assertEquals("Expected no CDC batch processing for synchronous indexes",
0, batchDelta);
+ assertEquals("Expected no CDC mutations for synchronous indexes", 0,
mutationDelta);
+ assertEquals("Expected no CDC batch process time samples for synchronous
indexes", 0,
+ histogramDelta);
+ }
+ }
+
private void dumpTableRows(Connection conn, String tableName) throws
SQLException {
LOGGER.info("Dumping {} table:", tableName);
try (ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " +
tableName)) {