This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch tmp-ec in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit c02cac38901a3d0667b4e295e656ff3993918d19 Author: Viraj Jasani <[email protected]> AuthorDate: Sat Apr 4 15:15:30 2026 -0700 keep serialize false as default --- .../phoenix/hbase/index/IndexCDCConsumer.java | 22 ++++- .../phoenix/hbase/index/IndexRegionObserver.java | 32 ++++--- .../java/org/apache/phoenix/end2end/Bson5IT.java | 2 + .../end2end/ConcurrentMutationsExtendedIT.java | 2 + .../ConcurrentMutationsExtendedIndexIT.java | 101 +++++++++++++++++++++ .../ConcurrentMutationsLazyPostBatchWriteIT.java | 3 + .../IndexToolForNonTxGlobalIndexEventualIT.java | 2 + .../end2end/MultiTenantEventualIndexIT.java | 2 + .../index/GlobalIndexCheckerEventualIT.java | 2 + 9 files changed, 148 insertions(+), 20 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index 191eef7598..dc49a77ea3 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -133,6 +133,10 @@ public class IndexCDCConsumer implements Runnable { "phoenix.index.cdc.consumer.retry.pause.ms"; private static final long DEFAULT_RETRY_PAUSE_MS = 2000; + public static final String INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS = + "phoenix.index.cdc.consumer.parent.progress.pause.ms"; + private static final long DEFAULT_PARENT_PROGRESS_PAUSE_MS = 15000; + private final RegionCoprocessorEnvironment env; private final String dataTableName; private final String encodedRegionName; @@ -143,6 +147,7 @@ public class IndexCDCConsumer implements Runnable { private final long pollIntervalMs; private final long timestampBufferMs; private final int maxDataVisibilityRetries; + private final long parentProgressPauseMs; private final Configuration config; private final boolean serializeCDCMutations; private volatile boolean stopped = false; @@ -217,6 +222,8 @@ public class IndexCDCConsumer implements Runnable { config.getLong(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, DEFAULT_TIMESTAMP_BUFFER_MS); this.maxDataVisibilityRetries = config.getInt(INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES, DEFAULT_MAX_DATA_VISIBILITY_RETRIES); + this.parentProgressPauseMs = + config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, DEFAULT_PARENT_PROGRESS_PAUSE_MS); DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); this.indexWriter = @@ -244,7 +251,6 @@ public class IndexCDCConsumer implements Runnable { if (indexWriter != null) { indexWriter.stop("IndexCDCConsumer stopped for " + dataTableName); } - LOG.info("Stopped IndexCDCConsumer for table {} region {}", dataTableName, encodedRegionName); } /** @@ -404,7 +410,14 @@ public class IndexCDCConsumer implements Runnable { dataTableName); return; } - LOG.info("IndexCDCConsumer started for table {} region {}", dataTableName, encodedRegionName); + LOG.info( + "IndexCDCConsumer started for table {} region {}" + + " [batchSize: {}, pollIntervalMs: {}, timestampBufferMs: {}, startupDelayMs: {}," + + " pause: {}, maxDataVisibilityRetries: {}, parentProgressPauseMs: {}," + + " serializeCDCMutations: {}]", + dataTableName, encodedRegionName, batchSize, pollIntervalMs, timestampBufferMs, + startupDelayMs, pause, maxDataVisibilityRetries, parentProgressPauseMs, + serializeCDCMutations); if (!waitForCDCStreamEntry()) { LOG.error( "IndexCDCConsumer stopped while waiting for CDC_STREAM entry for table {} region {}", @@ -784,14 +797,13 @@ public class IndexCDCConsumer implements Runnable { int batchCount = 0; while (!stopped) { try { - if (batchCount > 0 && batchCount % 2 == 0) { + if (batchCount > 0) { if (isPartitionCompleted(partitionId)) { return; } long otherProgress = getParentProgress(partitionId); if (otherProgress > currentLastProcessedTimestamp) { - // other owner has already made some progress, pause for a while before resuming - sleepIfNotStopped(10000); + sleepIfNotStopped(parentProgressPauseMs); if (isPartitionCompleted(partitionId)) { return; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 17362e159d..9763388eff 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -182,7 +182,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { * Controls which approach is used for implementing eventually consistent global secondary indexes * via the {@link IndexCDCConsumer}. * <p> - * <b>Approach 1: Serialized mutations (default, value = true)</b> + * <b>Approach 1: Serialized mutations (value = true)</b> * </p> * <p> * During {@code preBatchMutate}, {@link IndexRegionObserver} generates index mutations for each @@ -192,10 +192,10 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { * index, deserializes them, and applies them directly to the index table(s). In this approach, * the consumer does not need to understand index structure or re-derive mutations — it simply * replays what was already computed on the write path. The trade-off is increased CDC index row - * size due to the serialized mutation payload. + * size due to the serialized mutation payload, and additional write IO on the CDC index table. * </p> * <p> - * <b>Approach 2: Generated mutations from data row states (value = false)</b> + * <b>Approach 2: Generated mutations from data row states (default, value = false)</b> * </p> * <p> * During {@code preBatchMutate}, {@link IndexRegionObserver} writes only a lightweight CDC index @@ -207,27 +207,29 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { * change timestamp. These raw row states are returned as a Protobuf {@code DataRowStates} * message. The consumer then feeds these states into {@code generateIndexMutationsForRow()} — the * same core utility used by {@link IndexRegionObserver#prepareIndexMutations} on the write path — - * to derive index mutations at consume time. This approach keeps CDC index rows small and - * generates mutations based on the current index definition, but requires an additional data - * table read per CDC event and is sensitive to data visibility timing. Make sure max lookback age - * is long enough to retain before and after images of the row. + * to derive index mutations at consume time. This approach keeps CDC index rows small, avoids + * additional write IO, and generates mutations based on the current index definition, but + * requires an additional data table read per CDC event and is sensitive to data visibility + * timing. Make sure max lookback age is long enough to retain before and after images of the row. * </p> * <p> * <b>When to use which approach:</b> * </p> * <ul> - * <li>Use <b>Approach 1</b> (serialize = true) when scanning each data table row at consume time - * could be an IO bottleneck, and slightly higher write-path latency due to index mutation - * serialization is acceptable.</li> - * <li>Use <b>Approach 2</b> (serialize = false) when uniform and predictable write latency is a - * strict requirement regardless of the number and type (covered or uncovered) of the eventually - * consistent global secondary indexes, and the additional data table point-lookup with raw scan - * per CDC event at consume time is not a big IO concern.</li> + * <li>Use <b>Approach 2</b> (serialize = false, default) to minimize write IO: no serialized + * mutations are written to the CDC index, keeping CDC index rows small and write latency uniform. + * The trade-off is higher read IO at consume time — the consumer performs an additional data + * table point-lookup with a raw scan per CDC event to reconstruct row states.</li> + * <li>Use <b>Approach 1</b> (serialize = true) to minimize read IO: the consumer reads + * pre-computed mutations from the CDC index and applies them directly, with no data table scan + * required at consume time. The trade-off is higher write IO — serialized index mutations are + * written alongside each CDC index entry, increasing CDC index row size and write-path latency. + * Although CDC index is expected to have TTL same as the data table max lookback age.</li> * </ul> */ public static final String PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = "phoenix.index.cdc.mutation.serialize"; - public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = true; + public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = false; /** * Class to represent pending data table rows diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java index 3b68adb884..3e6a4f1989 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java @@ -19,6 +19,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -82,6 +83,7 @@ public class Bson5IT extends ParallelStatsDisabledIT { props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false)); props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index 671d130501..bda94a3c74 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -98,6 +99,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500)); props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(1000)); props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java index a04e48e1d5..1244b8d3e0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java @@ -252,6 +252,107 @@ public abstract class ConcurrentMutationsExtendedIndexIT extends ParallelStatsDi verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows); } + @Test(timeout = 1800000) + public void testConcurrentMutationsWithNonIndexedColumnUpdates() throws Exception { + Assume.assumeFalse(uncovered); + final int nThreads = 3; + final int batchSize = 100; + final int nRows = 500; + final int nIndexValues = 23; + final String tableName = generateUniqueName(); + final String coveredIndexName = generateUniqueName(); + final String uncoveredIndexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + + conn.createStatement() + .execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, " + + "v1 INTEGER, v2 INTEGER, v3 INTEGER, v4 INTEGER, v5 INTEGER, v6 INTEGER, v7 INTEGER, " + + "CONSTRAINT pk PRIMARY KEY (k1,k2))"); + + conn.createStatement().execute("CREATE INDEX " + coveredIndexName + " ON " + tableName + + "(v1) INCLUDE(v2)" + (eventual ? " CONSISTENCY = EVENTUAL" : "")); + + conn.createStatement().execute("CREATE UNCOVERED INDEX " + uncoveredIndexName + " ON " + + tableName + "(v3)" + (eventual ? " CONSISTENCY = EVENTUAL" : "")); + + runMutationPhase(tableName, nThreads, 13334, batchSize, nRows, nIndexValues, true); + Thread.sleep(5000); + + runMutationPhase(tableName, nThreads, 15000, batchSize, nRows, nIndexValues, false); + Thread.sleep(5000); + + runMutationPhase(tableName, nThreads, 15000, batchSize, nRows, nIndexValues, true); + Thread.sleep(15000); + + if (eventual) { + boolean verified = false; + for (int attempt = 0; attempt < 40 && !verified; attempt++) { + Thread.sleep(10000); + try { + long coveredCount = verifyIndexTable(tableName, coveredIndexName, conn, false); + assertEquals(nRows, coveredCount); + long uncoveredCount = verifyIndexTable(tableName, uncoveredIndexName, conn, false); + assertEquals(nRows, uncoveredCount); + verified = true; + } catch (AssertionError e) { + LOGGER.info("Verification attempt {} failed: {}", attempt, e.getMessage()); + dumpTableRows(conn, "SYSTEM.CDC_STREAM"); + dumpTableRows(conn, "SYSTEM.IDX_CDC_TRACKER"); + } + } + assertTrue("Index verification failed after retries", verified); + } else { + long coveredCount = verifyIndexTable(tableName, coveredIndexName, conn); + assertEquals(nRows, coveredCount); + long uncoveredCount = verifyIndexTable(tableName, uncoveredIndexName, conn); + assertEquals(nRows, uncoveredCount); + } + } + + private void runMutationPhase(String tableName, int nThreads, int mutationsPerThread, + int batchSize, int nRows, int nIndexValues, boolean includeIndexedColumns) throws Exception { + CountDownLatch doneSignal = new CountDownLatch(nThreads); + for (int t = 0; t < nThreads; t++) { + new Thread(() -> { + try { + Connection c = DriverManager.getConnection(getUrl()); + ThreadLocalRandom rand = ThreadLocalRandom.current(); + for (int j = 0; j < mutationsPerThread; j++) { + if (includeIndexedColumns) { + c.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES (" + (j % nRows) + ", 0, " + + randVal(rand, nIndexValues) + ", " + randVal(rand) + ", " + + randVal(rand, nIndexValues) + ", " + randVal(rand) + ", " + randVal(rand) + ", " + + randVal(rand) + ", " + randVal(rand) + ")"); + } else { + c.createStatement() + .execute("UPSERT INTO " + tableName + " (k1, k2, v4, v5, v6, v7) VALUES (" + + (j % nRows) + ", 0, " + randVal(rand) + ", " + randVal(rand) + ", " + + randVal(rand) + ", " + randVal(rand) + ")"); + } + if ((j % batchSize) == 0) { + c.commit(); + } + } + c.commit(); + } catch (SQLException e) { + LOGGER.warn("Mutation phase exception: {}", e.getMessage()); + } finally { + doneSignal.countDown(); + } + }).start(); + } + assertTrue("Mutation phase timed out", doneSignal.await(350, TimeUnit.SECONDS)); + } + + private static String randVal(ThreadLocalRandom rand) { + return rand.nextBoolean() ? "null" : Integer.toString(rand.nextInt()); + } + + private static String randVal(ThreadLocalRandom rand, int bound) { + return rand.nextBoolean() ? "null" : Integer.toString(rand.nextInt() % bound); + } + private void dumpTableRows(Connection conn, String tableName) throws SQLException { LOGGER.info("Dumping {} table:", tableName); try (ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName)) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java index a8ab061542..3b49514b99 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java @@ -19,6 +19,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; import java.util.Map; import org.apache.phoenix.coprocessor.PhoenixMasterObserver; @@ -54,9 +55,11 @@ public class ConcurrentMutationsLazyPostBatchWriteIT extends ConcurrentMutations props.put("phoenix.index.concurrent.wait.duration.ms", "10"); props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500)); props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2500)); props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java index 287f95c57a..737be134cd 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java @@ -19,6 +19,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; import java.util.Arrays; import java.util.Collection; @@ -70,6 +71,7 @@ public class IndexToolForNonTxGlobalIndexEventualIT extends IndexToolForNonTxGlo serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + serverProps.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString()); Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5); clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java index 33b7bd7465..e0a4ab2a4f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -77,6 +78,7 @@ public class MultiTenantEventualIndexIT extends ParallelStatsDisabledIT { props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, Boolean.TRUE.toString()); props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java index aee8564ab5..d7b5a35e1c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java @@ -19,6 +19,7 @@ package org.apache.phoenix.end2end.index; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; import java.util.Collection; import java.util.List; @@ -51,6 +52,7 @@ public class GlobalIndexCheckerEventualIT extends GlobalIndexCheckerIT { props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); }
