This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 13d432529afc758e8803707383a811e981f29300
Author: Viraj Jasani <[email protected]>
AuthorDate: Sat Apr 4 15:15:30 2026 -0700

    keep serialize false as default
---
 .../phoenix/hbase/index/IndexCDCConsumer.java      |  22 ++++-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  32 ++++---
 .../java/org/apache/phoenix/end2end/Bson5IT.java   |   2 +
 .../end2end/ConcurrentMutationsExtendedIT.java     |   2 +
 .../ConcurrentMutationsExtendedIndexIT.java        | 101 +++++++++++++++++++++
 .../ConcurrentMutationsLazyPostBatchWriteIT.java   |   2 +
 .../IndexToolForNonTxGlobalIndexEventualIT.java    |   2 +
 .../end2end/MultiTenantEventualIndexIT.java        |   2 +
 .../index/GlobalIndexCheckerEventualIT.java        |   2 +
 9 files changed, 147 insertions(+), 20 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 191eef7598..dc49a77ea3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -133,6 +133,10 @@ public class IndexCDCConsumer implements Runnable {
     "phoenix.index.cdc.consumer.retry.pause.ms";
   private static final long DEFAULT_RETRY_PAUSE_MS = 2000;
 
+  public static final String INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS =
+    "phoenix.index.cdc.consumer.parent.progress.pause.ms";
+  private static final long DEFAULT_PARENT_PROGRESS_PAUSE_MS = 15000;
+
   private final RegionCoprocessorEnvironment env;
   private final String dataTableName;
   private final String encodedRegionName;
@@ -143,6 +147,7 @@ public class IndexCDCConsumer implements Runnable {
   private final long pollIntervalMs;
   private final long timestampBufferMs;
   private final int maxDataVisibilityRetries;
+  private final long parentProgressPauseMs;
   private final Configuration config;
   private final boolean serializeCDCMutations;
   private volatile boolean stopped = false;
@@ -217,6 +222,8 @@ public class IndexCDCConsumer implements Runnable {
       config.getLong(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, 
DEFAULT_TIMESTAMP_BUFFER_MS);
     this.maxDataVisibilityRetries = 
config.getInt(INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES,
       DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
+    this.parentProgressPauseMs =
+      config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, 
DEFAULT_PARENT_PROGRESS_PAUSE_MS);
     DelegateRegionCoprocessorEnvironment indexWriterEnv =
       new DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
     this.indexWriter =
@@ -244,7 +251,6 @@ public class IndexCDCConsumer implements Runnable {
     if (indexWriter != null) {
       indexWriter.stop("IndexCDCConsumer stopped for " + dataTableName);
     }
-    LOG.info("Stopped IndexCDCConsumer for table {} region {}", dataTableName, 
encodedRegionName);
   }
 
   /**
@@ -404,7 +410,14 @@ public class IndexCDCConsumer implements Runnable {
           dataTableName);
         return;
       }
-      LOG.info("IndexCDCConsumer started for table {} region {}", 
dataTableName, encodedRegionName);
+      LOG.info(
+        "IndexCDCConsumer started for table {} region {}"
+          + " [batchSize: {}, pollIntervalMs: {}, timestampBufferMs: {}, 
startupDelayMs: {},"
+          + " pause: {}, maxDataVisibilityRetries: {}, parentProgressPauseMs: 
{},"
+          + " serializeCDCMutations: {}]",
+        dataTableName, encodedRegionName, batchSize, pollIntervalMs, 
timestampBufferMs,
+        startupDelayMs, pause, maxDataVisibilityRetries, parentProgressPauseMs,
+        serializeCDCMutations);
       if (!waitForCDCStreamEntry()) {
         LOG.error(
           "IndexCDCConsumer stopped while waiting for CDC_STREAM entry for 
table {} region {}",
@@ -784,14 +797,13 @@ public class IndexCDCConsumer implements Runnable {
     int batchCount = 0;
     while (!stopped) {
       try {
-        if (batchCount > 0 && batchCount % 2 == 0) {
+        if (batchCount > 0) {
           if (isPartitionCompleted(partitionId)) {
             return;
           }
           long otherProgress = getParentProgress(partitionId);
           if (otherProgress > currentLastProcessedTimestamp) {
-            // other owner has already made some progress, pause for a while 
before resuming
-            sleepIfNotStopped(10000);
+            sleepIfNotStopped(parentProgressPauseMs);
             if (isPartitionCompleted(partitionId)) {
               return;
             }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 17362e159d..9763388eff 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -182,7 +182,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
    * Controls which approach is used for implementing eventually consistent 
global secondary indexes
    * via the {@link IndexCDCConsumer}.
    * <p>
-   * <b>Approach 1: Serialized mutations (default, value = true)</b>
+   * <b>Approach 1: Serialized mutations (value = true)</b>
    * </p>
    * <p>
    * During {@code preBatchMutate}, {@link IndexRegionObserver} generates 
index mutations for each
@@ -192,10 +192,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
    * index, deserializes them, and applies them directly to the index 
table(s). In this approach,
    * the consumer does not need to understand index structure or re-derive 
mutations — it simply
    * replays what was already computed on the write path. The trade-off is 
increased CDC index row
-   * size due to the serialized mutation payload.
+   * size due to the serialized mutation payload, and additional write IO on 
the CDC index table.
    * </p>
    * <p>
-   * <b>Approach 2: Generated mutations from data row states (value = 
false)</b>
+   * <b>Approach 2: Generated mutations from data row states (default, value = 
false)</b>
    * </p>
    * <p>
    * During {@code preBatchMutate}, {@link IndexRegionObserver} writes only a 
lightweight CDC index
@@ -207,27 +207,29 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
    * change timestamp. These raw row states are returned as a Protobuf {@code 
DataRowStates}
    * message. The consumer then feeds these states into {@code 
generateIndexMutationsForRow()} — the
    * same core utility used by {@link 
IndexRegionObserver#prepareIndexMutations} on the write path —
-   * to derive index mutations at consume time. This approach keeps CDC index 
rows small and
-   * generates mutations based on the current index definition, but requires 
an additional data
-   * table read per CDC event and is sensitive to data visibility timing. Make 
sure max lookback age
-   * is long enough to retain before and after images of the row.
+   * to derive index mutations at consume time. This approach keeps CDC index 
rows small, avoids
+   * additional write IO, and generates mutations based on the current index 
definition, but
+   * requires an additional data table read per CDC event and is sensitive to 
data visibility
+   * timing. Make sure max lookback age is long enough to retain before and 
after images of the row.
    * </p>
    * <p>
    * <b>When to use which approach:</b>
    * </p>
    * <ul>
-   * <li>Use <b>Approach 1</b> (serialize = true) when scanning each data 
table row at consume time
-   * could be an IO bottleneck, and slightly higher write-path latency due to 
index mutation
-   * serialization is acceptable.</li>
-   * <li>Use <b>Approach 2</b> (serialize = false) when uniform and 
predictable write latency is a
-   * strict requirement regardless of the number and type (covered or 
uncovered) of the eventually
-   * consistent global secondary indexes, and the additional data table 
point-lookup with raw scan
-   * per CDC event at consume time is not a big IO concern.</li>
+   * <li>Use <b>Approach 2</b> (serialize = false, default) to minimize write 
IO: no serialized
+   * mutations are written to the CDC index, keeping CDC index rows small and 
write latency uniform.
+   * The trade-off is higher read IO at consume time — the consumer performs 
an additional data
+   * table point-lookup with a raw scan per CDC event to reconstruct row 
states.</li>
+   * <li>Use <b>Approach 1</b> (serialize = true) to minimize read IO: the 
consumer reads
+   * pre-computed mutations from the CDC index and applies them directly, with 
no data table scan
+   * required at consume time. The trade-off is higher write IO — serialized 
index mutations are
+   * written alongside each CDC index entry, increasing CDC index row size and 
write-path latency.
+   * Although CDC index is expected to have TTL same as the data table max 
lookback age.</li>
    * </ul>
    */
   public static final String PHOENIX_INDEX_CDC_MUTATION_SERIALIZE =
     "phoenix.index.cdc.mutation.serialize";
-  public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = 
true;
+  public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = 
false;
 
   /**
    * Class to represent pending data table rows
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
index 3b68adb884..3e6a4f1989 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -82,6 +83,7 @@ public class Bson5IT extends ParallelStatsDisabledIT {
     props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(false));
     props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000));
     props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 671d130501..bda94a3c74 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
 import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -98,6 +99,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
     props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
     props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(1000));
     props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
index a04e48e1d5..1244b8d3e0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java
@@ -252,6 +252,107 @@ public abstract class ConcurrentMutationsExtendedIndexIT 
extends ParallelStatsDi
     verifyRandomIndexes(allIndexes, schemaName, tableName, conn, nRows);
   }
 
+  @Test(timeout = 1800000)
+  public void testConcurrentMutationsWithNonIndexedColumnUpdates() throws 
Exception {
+    Assume.assumeFalse(uncovered);
+    final int nThreads = 3;
+    final int batchSize = 100;
+    final int nRows = 500;
+    final int nIndexValues = 23;
+    final String tableName = generateUniqueName();
+    final String coveredIndexName = generateUniqueName();
+    final String uncoveredIndexName = generateUniqueName();
+    Connection conn = DriverManager.getConnection(getUrl());
+
+    conn.createStatement()
+      .execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER 
NOT NULL, "
+        + "v1 INTEGER, v2 INTEGER, v3 INTEGER, v4 INTEGER, v5 INTEGER, v6 
INTEGER, v7 INTEGER, "
+        + "CONSTRAINT pk PRIMARY KEY (k1,k2))");
+
+    conn.createStatement().execute("CREATE INDEX " + coveredIndexName + " ON " 
+ tableName
+      + "(v1) INCLUDE(v2)" + (eventual ? " CONSISTENCY = EVENTUAL" : ""));
+
+    conn.createStatement().execute("CREATE UNCOVERED INDEX " + 
uncoveredIndexName + " ON "
+      + tableName + "(v3)" + (eventual ? " CONSISTENCY = EVENTUAL" : ""));
+
+    runMutationPhase(tableName, nThreads, 13334, batchSize, nRows, 
nIndexValues, true);
+    Thread.sleep(5000);
+
+    runMutationPhase(tableName, nThreads, 15000, batchSize, nRows, 
nIndexValues, false);
+    Thread.sleep(5000);
+
+    runMutationPhase(tableName, nThreads, 15000, batchSize, nRows, 
nIndexValues, true);
+    Thread.sleep(15000);
+
+    if (eventual) {
+      boolean verified = false;
+      for (int attempt = 0; attempt < 40 && !verified; attempt++) {
+        Thread.sleep(10000);
+        try {
+          long coveredCount = verifyIndexTable(tableName, coveredIndexName, 
conn, false);
+          assertEquals(nRows, coveredCount);
+          long uncoveredCount = verifyIndexTable(tableName, 
uncoveredIndexName, conn, false);
+          assertEquals(nRows, uncoveredCount);
+          verified = true;
+        } catch (AssertionError e) {
+          LOGGER.info("Verification attempt {} failed: {}", attempt, 
e.getMessage());
+          dumpTableRows(conn, "SYSTEM.CDC_STREAM");
+          dumpTableRows(conn, "SYSTEM.IDX_CDC_TRACKER");
+        }
+      }
+      assertTrue("Index verification failed after retries", verified);
+    } else {
+      long coveredCount = verifyIndexTable(tableName, coveredIndexName, conn);
+      assertEquals(nRows, coveredCount);
+      long uncoveredCount = verifyIndexTable(tableName, uncoveredIndexName, 
conn);
+      assertEquals(nRows, uncoveredCount);
+    }
+  }
+
+  private void runMutationPhase(String tableName, int nThreads, int 
mutationsPerThread,
+    int batchSize, int nRows, int nIndexValues, boolean includeIndexedColumns) 
throws Exception {
+    CountDownLatch doneSignal = new CountDownLatch(nThreads);
+    for (int t = 0; t < nThreads; t++) {
+      new Thread(() -> {
+        try {
+          Connection c = DriverManager.getConnection(getUrl());
+          ThreadLocalRandom rand = ThreadLocalRandom.current();
+          for (int j = 0; j < mutationsPerThread; j++) {
+            if (includeIndexedColumns) {
+              c.createStatement()
+                .execute("UPSERT INTO " + tableName + " VALUES (" + (j % 
nRows) + ", 0, "
+                  + randVal(rand, nIndexValues) + ", " + randVal(rand) + ", "
+                  + randVal(rand, nIndexValues) + ", " + randVal(rand) + ", " 
+ randVal(rand) + ", "
+                  + randVal(rand) + ", " + randVal(rand) + ")");
+            } else {
+              c.createStatement()
+                .execute("UPSERT INTO " + tableName + " (k1, k2, v4, v5, v6, 
v7) VALUES ("
+                  + (j % nRows) + ", 0, " + randVal(rand) + ", " + 
randVal(rand) + ", "
+                  + randVal(rand) + ", " + randVal(rand) + ")");
+            }
+            if ((j % batchSize) == 0) {
+              c.commit();
+            }
+          }
+          c.commit();
+        } catch (SQLException e) {
+          LOGGER.warn("Mutation phase exception: {}", e.getMessage());
+        } finally {
+          doneSignal.countDown();
+        }
+      }).start();
+    }
+    assertTrue("Mutation phase timed out", doneSignal.await(350, 
TimeUnit.SECONDS));
+  }
+
+  private static String randVal(ThreadLocalRandom rand) {
+    return rand.nextBoolean() ? "null" : Integer.toString(rand.nextInt());
+  }
+
+  private static String randVal(ThreadLocalRandom rand, int bound) {
+    return rand.nextBoolean() ? "null" : Integer.toString(rand.nextInt() % 
bound);
+  }
+
   private void dumpTableRows(Connection conn, String tableName) throws 
SQLException {
     LOGGER.info("Dumping {} table:", tableName);
     try (ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + 
tableName)) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
index a8ab061542..3b6510b31f 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 
 import java.util.Map;
 import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
@@ -57,6 +58,7 @@ public class ConcurrentMutationsLazyPostBatchWriteIT extends 
ConcurrentMutations
     props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500));
     props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2500));
     props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
index 287f95c57a..737be134cd 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -70,6 +71,7 @@ public class IndexToolForNonTxGlobalIndexEventualIT extends 
IndexToolForNonTxGlo
     serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, 
Integer.toString(2000));
     serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5));
     serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 
Integer.toString(-1));
+    serverProps.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, 
Boolean.TRUE.toString());
     Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
     clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(true));
     clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, 
Long.toString(5));
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java
index 33b7bd7465..e0a4ab2a4f 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
 import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -77,6 +78,7 @@ public class MultiTenantEventualIndexIT extends 
ParallelStatsDisabledIT {
     props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
     props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, 
Boolean.TRUE.toString());
     props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
index aee8564ab5..d7b5a35e1c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end.index;
 
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
 import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
 
 import java.util.Collection;
 import java.util.List;
@@ -51,6 +52,7 @@ public class GlobalIndexCheckerEventualIT extends 
GlobalIndexCheckerIT {
     props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
     props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
     props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.TRUE.toString());
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 

Reply via email to