This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 71034cb2a9 PHOENIX-7860 Fix cell serialization and deserialization 
when the cells from mutations from CP are merged (#2483)
71034cb2a9 is described below

commit 71034cb2a9c098603a002340b8879c69be5f5ef6
Author: tkhurana <[email protected]>
AuthorDate: Wed May 20 15:46:54 2026 -0700

    PHOENIX-7860 Fix cell serialization and deserialization when the cells from 
mutations from CP are merged (#2483)
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 116 ++++----
 .../phoenix/replication/log/LogFileCodec.java      |  30 +-
 .../phoenix/replication/log/LogFileRecord.java     |  39 +--
 .../phoenix/replication/ReplicationLogGroupIT.java | 305 ++++++++++++++++++---
 .../phoenix/replication/log/LogFileCodecTest.java  |  73 ++++-
 .../phoenix/replication/log/LogFileFormatTest.java |   3 +-
 6 files changed, 422 insertions(+), 144 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b968914731..01f7ee898b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -160,6 +160,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -756,51 +757,22 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
    */
   private void replicateEditOnWALRestore(ReplicationLogGroup logGroup, WALKey 
logKey,
     WALEdit logEdit) throws IOException {
-    ImmutableBytesPtr prevKey = null, currentKey = null;
-    Put put = null;
-    Delete del = null;
+    List<Cell> regularCells = new ArrayList<>();
     for (Cell kv : logEdit.getCells()) {
       if (kv instanceof IndexedKeyValue) {
         IndexedKeyValue ikv = (IndexedKeyValue) kv;
         logGroup.append(Bytes.toString(ikv.getIndexTable()), -1, 
ikv.getMutation());
       } else {
-        // While we can generate a separate mutation for every cell that is 
part of the
-        // WAL edit and replicate each such mutation. Doing that will not be 
very efficient
-        // since a mutation can have large number of cells. Instead, we first 
group the
-        // cells belonging to the same row into a mutation and then replicate 
that
-        // mutation.
-        currentKey = new ImmutableBytesPtr(kv.getRowArray(), 
kv.getRowOffset(), kv.getRowLength());
-        if (!currentKey.equals(prevKey)) {
-          if (put != null && !this.ignoreReplicationFilter.test(put)) {
-            logGroup.append(logKey.getTableName().getNameAsString(), -1, put);
-          }
-          if (del != null && !this.ignoreReplicationFilter.test(del)) {
-            logGroup.append(logKey.getTableName().getNameAsString(), -1, del);
-          }
-          // reset
-          put = null;
-          del = null;
-        }
-        if (kv.getType() == Cell.Type.Put) {
-          if (put == null) {
-            put = new Put(currentKey.get(), currentKey.getOffset(), 
currentKey.getLength());
-          }
-          put.add(kv);
-        } else {
-          if (del == null) {
-            del = new Delete(currentKey.get(), currentKey.getOffset(), 
currentKey.getLength());
-          }
-          del.add(kv);
-        }
-        prevKey = currentKey;
+        regularCells.add(kv);
       }
     }
-    // append the last one
-    if (put != null && !this.ignoreReplicationFilter.test(put)) {
-      logGroup.append(logKey.getTableName().getNameAsString(), -1, put);
-    }
-    if (del != null && !this.ignoreReplicationFilter.test(del)) {
-      logGroup.append(logKey.getTableName().getNameAsString(), -1, del);
+    if (!regularCells.isEmpty()) {
+      String tableName = logKey.getTableName().getNameAsString();
+      for (Mutation split : splitCellsIntoMutations(regularCells)) {
+        if (!this.ignoreReplicationFilter.test(split)) {
+          logGroup.append(tableName, -1, split);
+        }
+      }
     }
     logGroup.sync();
   }
@@ -2632,6 +2604,52 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     return status.getOperationStatusCode() == SUCCESS && status.getResult() != 
null;
   }
 
+  /**
+   * Splits cells into individual Put/Delete mutations grouped by (row key, 
put-vs-delete). HBase's
+   * checkAndMergeCPMutations merges coprocessor cells into the data mutation, 
so a single Put may
+   * contain Delete cells with different row keys (e.g., local index). This 
method recovers distinct
+   * mutations using the same grouping algorithm as HBase's ReplicationSink.
+   */
+  private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
+    return previousCell == null || previousCell.getType() != cell.getType()
+      || !CellUtil.matchingRows(previousCell, cell);
+  }
+
+  static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) throws 
IOException {
+    List<Mutation> result = new ArrayList<>();
+    Cell previousCell = null;
+    Mutation current = null;
+    for (Cell cell : cells) {
+      if (isNewRowOrType(previousCell, cell)) {
+        if (current != null) {
+          result.add(current);
+        }
+        if (CellUtil.isDelete(cell)) {
+          current = new Delete(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
+        } else {
+          current = new Put(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
+        }
+      }
+      if (CellUtil.isDelete(cell)) {
+        ((Delete) current).add(cell);
+      } else {
+        ((Put) current).add(cell);
+      }
+      previousCell = cell;
+    }
+    if (current != null) {
+      result.add(current);
+    }
+    return result;
+  }
+
+  static List<Mutation> splitCellsIntoMutations(Mutation merged) throws 
IOException {
+    if (merged.isEmpty()) {
+      return Collections.singletonList(merged);
+    }
+    return 
splitCellsIntoMutations(Iterables.concat(merged.getFamilyCellMap().values()));
+  }
+
   private void replicateMutations(RegionCoprocessorEnvironment env,
     MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext 
context)
     throws IOException {
@@ -2647,17 +2665,21 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     if (!logGroup.isPresent()) {
       return;
     }
+    ReplicationLogGroup group = logGroup.get();
 
-    for (Integer i = 0; i < miniBatchOp.size(); i++) {
+    for (int i = 0; i < miniBatchOp.size(); i++) {
       Mutation m = miniBatchOp.getOperation(i);
       if (this.ignoreReplicationFilter.test(m)) {
         continue;
       }
-      logGroup.get().append(this.dataTableName, -1, m);
-      Mutation[] mutationsAddedByCP = 
miniBatchOp.getOperationsFromCoprocessors(i);
-      if (mutationsAddedByCP != null) {
-        for (Mutation addedMutation : mutationsAddedByCP) {
-          logGroup.get().append(this.dataTableName, -1, addedMutation);
+      // When coprocessors add cells (local index, conditional TTL, ON 
DUPLICATE KEY UPDATE),
+      // HBase merges them into the data mutation which can mix row keys and 
cell types.
+      // Split those back into individual Put/Delete mutations for correct 
serialization.
+      if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
+        group.append(this.dataTableName, -1, m);
+      } else {
+        for (Mutation split : splitCellsIntoMutations(m)) {
+          group.append(this.dataTableName, -1, split);
         }
       }
     }
@@ -2667,7 +2689,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         if (this.ignoreReplicationFilter.test(entry.getValue())) {
           continue;
         }
-        logGroup.get().append(entry.getKey().getTableName(), -1, 
entry.getValue());
+        group.append(entry.getKey().getTableName(), -1, entry.getValue());
       }
     }
     if (context.postIndexUpdates != null) {
@@ -2676,9 +2698,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         if (this.ignoreReplicationFilter.test(entry.getValue())) {
           continue;
         }
-        logGroup.get().append(entry.getKey().getTableName(), -1, 
entry.getValue());
+        group.append(entry.getKey().getTableName(), -1, entry.getValue());
       }
     }
-    logGroup.get().sync();
+    group.sync();
   }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
index 1ce29fd0b6..a0975287d3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -68,6 +69,7 @@ import org.apache.hadoop.io.WritableUtils;
  *   |   | PER-CELL DATA (repeated)             | |
  *   |   |   +––––––––––––----------------–--–+ | |
  *   |   |   | CELL TIMESTAMP (long)          | | |
+ *   |   |   | CELL TYPE (byte)               | | |
  *   |   |   | COLUMN QUALIFIER LENGTH (vint) | | |
  *   |   |   | COLUMN QUALIFIER (byte[])      | | |
  *   |   |   | VALUE LENGTH (vint)            | | |
@@ -141,6 +143,7 @@ public class LogFileCodec implements LogFile.Codec {
         WritableUtils.writeVInt(recordOut, cells.size());
         for (Cell cell : cells) {
           recordOut.writeLong(cell.getTimestamp());
+          recordOut.writeByte(cell.getTypeByte());
           WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
           recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
             cell.getQualifierLength());
@@ -205,9 +208,6 @@ public class LogFileCodec implements LogFile.Codec {
             mutation = new Put(rowKey);
             break;
           case DELETE:
-          case DELETEFAMILYVERSION:
-          case DELETECOLUMN:
-          case DELETEFAMILY:
             mutation = new Delete(rowKey);
             break;
           default:
@@ -229,6 +229,8 @@ public class LogFileCodec implements LogFile.Codec {
           for (int j = 0; j < columnValuePairsCount; j++) {
             // Cell timestamp
             long cellTs = in.readLong();
+            // Cell type byte
+            byte cellTypeByte = in.readByte();
             // Qualifier name
             int qualLen = WritableUtils.readVInt(in);
             byte[] qual = new byte[qualLen];
@@ -241,22 +243,12 @@ public class LogFileCodec implements LogFile.Codec {
             if (valueLen > 0) {
               in.readFully(value);
             }
-            switch (type) {
-              case PUT:
-                ((Put) mutation).addColumn(cf, qual, cellTs, value);
-                break;
-              case DELETE:
-              case DELETECOLUMN:
-                ((Delete) mutation).addColumn(cf, qual, cellTs);
-                break;
-              case DELETEFAMILYVERSION:
-                ((Delete) mutation).addFamilyVersion(cf, cellTs);
-                break;
-              case DELETEFAMILY:
-                ((Delete) mutation).addFamily(cf, cellTs);
-                break;
-              default:
-                throw new UnsupportedOperationException("Unhandled mutation 
type " + type);
+            Cell cell = new KeyValue(rowKey, 0, rowKey.length, cf, 0, 
cf.length, qual, 0,
+              qual.length, cellTs, KeyValue.Type.codeToType(cellTypeByte), 
value, 0, value.length);
+            if (mutation instanceof Put) {
+              ((Put) mutation).add(cell);
+            } else {
+              ((Delete) mutation).add(cell);
             }
           }
         }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
index 4b62ccfa7d..6b12a3e659 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
@@ -17,9 +17,6 @@
  */
 package org.apache.phoenix.replication.log;
 
-import java.io.IOException;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -114,16 +111,9 @@ public class LogFileRecord implements LogFile.Record {
 
   // Internals only below. Not for LogFile interface consumer use.
 
-  /**
-   * The Phoenix concept of HBase mutation type, which is currently a 1:1 
mapping with HBase's, with
-   * different code values (they don't need to match), but may potentially 
diverge in the future.
-   */
   protected enum MutationType {
     PUT(1),
-    DELETE(2),
-    DELETEFAMILYVERSION(3),
-    DELETECOLUMN(4),
-    DELETEFAMILY(5);
+    DELETE(2);
 
     private int code;
 
@@ -135,33 +125,12 @@ public class LogFileRecord implements LogFile.Record {
       return code;
     }
 
-    static MutationType get(Mutation mutation) throws IOException {
+    static MutationType get(Mutation mutation) {
       if (mutation instanceof Put) {
         return PUT;
       } else if (mutation instanceof Delete) {
-        CellScanner s = mutation.cellScanner();
-        if (!s.advance()) {
-          // No cell in delete. A simple delete of a row.
-          return DELETE;
-        }
-        // This assumes that either there is only one cell in the Delete, or 
all cells in
-        // the delete have the same cell type, which is correct as of today. 
We only need
-        // to look at the first.
-        Cell cell = s.current();
-        switch (cell.getType()) {
-          case Delete:
-            return DELETE;
-          case DeleteFamilyVersion:
-            return DELETEFAMILYVERSION;
-          case DeleteColumn:
-            return DELETECOLUMN;
-          case DeleteFamily:
-            return DELETEFAMILY;
-          default:
-            // Fall through to throw the UnsupportedOperationException
-            break;
-        }
-      } // Fall through to throw the UnsupportedOperationException
+        return DELETE;
+      }
       throw new UnsupportedOperationException("Unsupported mutation type: " + 
mutation);
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index 651cddde6c..c0172274c7 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -24,23 +24,35 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_
 import static org.apache.phoenix.query.BaseTest.generateUniqueName;
 import static 
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -55,6 +67,7 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.query.PhoenixTestBuilder;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.replication.reader.ReplicationLogProcessor;
 import org.apache.phoenix.replication.tool.LogFileAnalyzer;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
@@ -209,25 +222,60 @@ public class ReplicationLogGroupIT extends HABaseIT {
     return schemaBuilder;
   }
 
+  private void replayAndVerifyAcrossClusters(List<String> ddlStatements, 
String... tablesToVerify)
+    throws Exception {
+    Path standByLogDir = 
logGroup.getOrCreatePeerShardManager().getRootDirectoryPath();
+
+    // Create the same schema on cluster 2
+    try (Connection conn2 = CLUSTERS.getCluster2Connection(haGroup)) {
+      for (String ddl : ddlStatements) {
+        conn2.createStatement().execute(ddl);
+      }
+      conn2.commit();
+    }
+
+    // Replay replication log on cluster 2
+    FileSystem fs = standByLogDir.getFileSystem(conf2);
+    List<Path> logFiles = findLogFiles(standByLogDir, fs);
+    assertTrue("Should have at least one log file", !logFiles.isEmpty());
+    ReplicationLogProcessor processor = ReplicationLogProcessor.get(conf2, 
haGroupName);
+    try {
+      for (Path logFile : logFiles) {
+        LOG.info("Replaying log file: {}", logFile);
+        processor.processLogFile(fs, logFile);
+      }
+    } finally {
+      processor.close();
+    }
+
+    // Verify tables match across clusters at the HBase cell level
+    for (String table : tablesToVerify) {
+      assertTablesEqualAcrossClusters(table);
+    }
+  }
+
   @Test
   public void testAppendAndSync() throws Exception {
     final String tableName = "T_" + generateUniqueName();
     final String indexName1 = "I_" + generateUniqueName();
     final String indexName2 = "I_" + generateUniqueName();
     final String indexName3 = "L_" + generateUniqueName();
+    String createTableDdl = String.format("create table if not exists %s (id1 
integer not null, "
+      + "id2 integer not null, val1 varchar, val2 varchar "
+      + "constraint pk primary key (id1, id2))", tableName);
+    String createIndex1Ddl = String
+      .format("create index if not exists %s on %s (val1) include (val2)", 
indexName1, tableName);
+    String createIndex2Ddl = String
+      .format("create index if not exists %s on %s (val2) include (val1)", 
indexName2, tableName);
+    String createLocalIndexDdl = String.format(
+      "create local index if not exists %s on %s (id2,val1) include (val2)", 
indexName3, tableName);
+
     try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) 
DriverManager
       .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
-      String ddl = String.format("create table %s (id1 integer not null, "
-        + "id2 integer not null, val1 varchar, val2 varchar "
-        + "constraint pk primary key (id1, id2))", tableName);
-      conn.createStatement().execute(ddl);
-      ddl = String.format("create index %s on %s (val1) include (val2)", 
indexName1, tableName);
-      conn.createStatement().execute(ddl);
-      ddl = String.format("create index %s on %s (val2) include (val1)", 
indexName2, tableName);
-      conn.createStatement().execute(ddl);
-      ddl = String.format("create local index %s on %s (id2,val1) include 
(val2)", indexName3,
-        tableName);
-      conn.createStatement().execute(ddl);
+      conn.createStatement().execute(createTableDdl);
+      conn.createStatement().execute(createIndex1Ddl);
+      conn.createStatement().execute(createIndex2Ddl);
+      conn.createStatement().execute(createLocalIndexDdl);
       conn.commit();
       PreparedStatement stmt =
         conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?)");
@@ -255,38 +303,43 @@ public class ReplicationLogGroupIT extends HABaseIT {
           assertEquals(0, stmt.executeUpdate());
         }
       }
-      // verify the correctness of the index
-      // TODO Index tool test API doesn't work with Failover connection
-      // IndexToolIT.verifyIndexTable(conf1, tableName, indexName1, conn);
-      // verify replication
-      Map<String, Integer> expected = Maps.newHashMap();
+
+      // verify replication mutation counts
       // mutation count will be equal to row count since the atomic upsert 
mutations will be
       // ignored and therefore not replicated
+      Map<String, Integer> expected = Maps.newHashMap();
       expected.put(tableName, rowCount * 3); // Put + Delete + local index 
update
-      // for index1 unverified + verified + delete (Delete column)
-      expected.put(indexName1, rowCount * 3);
-      // for index2 unverified + verified since the null column is part of row 
key
-      expected.put(indexName2, rowCount * 2);
-      // we didn't create any tenant views so no change in the syscat entries
+      expected.put(indexName1, rowCount * 3); // unverified + verified + 
delete (DeleteColumn)
+      expected.put(indexName2, rowCount * 2); // unverified + verified
       expected.put(SYSTEM_CATALOG_NAME, 0);
       expected.put(SYSTEM_CHILD_LINK_NAME, 0);
       verifyReplication(expected);
+
+      // Replay on cluster 2 and verify cross-cluster cell-level equality
+      replayAndVerifyAcrossClusters(
+        Arrays.asList(createTableDdl, createIndex1Ddl, createIndex2Ddl, 
createLocalIndexDdl),
+        tableName, indexName1, indexName2);
     }
   }
 
   @Test
   public void testAppendAndSyncNoIndex() throws Exception {
     final String tableName = "T_" + generateUniqueName();
+    // Multiple column families: cf1 and cf2
+    String createTableDdl = String.format(
+      "create table if not exists %s (id1 integer not null, "
+        + "id2 integer not null, cf1.val1 varchar, cf1.val2 varchar, "
+        + "cf2.val3 varchar, cf2.val4 integer " + "constraint pk primary key 
(id1, id2))",
+      tableName);
+
     try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) 
DriverManager
       .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
-      String ddl = String.format("create table %s (id1 integer not null, "
-        + "id2 integer not null, val1 varchar, val2 varchar "
-        + "constraint pk primary key (id1, id2))", tableName);
-      conn.createStatement().execute(ddl);
+      conn.createStatement().execute(createTableDdl);
       conn.commit();
+
+      // upsert 50 rows across multiple column families
       PreparedStatement stmt =
-        conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?)");
-      // upsert 50 rows
+        conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?, ?, ?)");
       int rowCount = 50;
       for (int i = 0; i < 5; ++i) {
         for (int j = 0; j < 10; ++j) {
@@ -294,16 +347,144 @@ public class ReplicationLogGroupIT extends HABaseIT {
           stmt.setInt(2, j);
           stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
           stmt.setString(4, null);
+          stmt.setString(5, "val3_" + i + "_" + j);
+          stmt.setInt(6, i * 10 + j);
           stmt.executeUpdate();
         }
         conn.commit();
       }
-      // verify replication
+
+      // Delete some rows
+      PreparedStatement deleteStmt =
+        conn.prepareStatement("delete from " + tableName + " where id1 = ? and 
id2 = ?");
+      int deleteCount = 10;
+      for (int j = 0; j < 10; ++j) {
+        deleteStmt.setInt(1, 0);
+        deleteStmt.setInt(2, j);
+        deleteStmt.executeUpdate();
+      }
+      conn.commit();
+
+      // verify replication mutation counts
       Map<String, Integer> expected = Maps.newHashMap();
-      // mutation count will be equal to row count since the atomic upsert 
mutations will be
-      // ignored and therefore not replicated
-      expected.put(tableName, rowCount * 2); // Put + Delete
+      // Each upsert produces Put + Delete (for null columns), row deletes 
produce DeleteFamily
+      expected.put(tableName, rowCount * 2 + deleteCount);
       verifyReplication(expected);
+
+      // Replay on cluster 2 and verify cross-cluster cell-level equality
+      replayAndVerifyAcrossClusters(Collections.singletonList(createTableDdl), 
tableName);
+    }
+  }
+
+  /**
+   * Verifies cross-cluster cell-level equality after replay when ON DUPLICATE 
KEY UPDATE rewrites a
+   * row. The atomic upsert path produces a Put (and optionally a Delete with 
DeleteColumn cells)
+   * that flow through the coprocessor merge path the codec must round-trip 
correctly.
+   */
+  @Test
+  public void testAppendAndSyncOnDuplicateKeyUpdate() throws Exception {
+    final String tableName = "T_" + generateUniqueName();
+    String createTableDdl = String.format("create table if not exists %s "
+      + "(pk varchar primary key, counter1 bigint, counter2 varchar)", 
tableName);
+
+    try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) 
DriverManager
+      .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+      conn.createStatement().execute(createTableDdl);
+      conn.commit();
+
+      // Initial inserts for 5 distinct rows
+      PreparedStatement insert =
+        conn.prepareStatement("upsert into " + tableName + " VALUES(?, 0, 
'init')");
+      for (int i = 0; i < 5; ++i) {
+        insert.setString(1, "row_" + i);
+        insert.executeUpdate();
+      }
+      conn.commit();
+
+      // ON DUPLICATE KEY UPDATE — increment counter1 and update counter2 a 
few times per row.
+      // Each invocation against an existing row triggers the atomic upsert 
path which generates
+      // Put (and possibly Delete) mutations on the server side and merges CP 
cells.
+      String dml = "UPSERT INTO " + tableName + " VALUES(?, 0, ?) "
+        + "ON DUPLICATE KEY UPDATE counter1 = counter1 + 1, counter2 = ?";
+      PreparedStatement update = conn.prepareStatement(dml);
+      conn.setAutoCommit(true);
+      for (int round = 0; round < 3; ++round) {
+        for (int i = 0; i < 5; ++i) {
+          update.setString(1, "row_" + i);
+          update.setString(2, "v" + round);
+          update.setString(3, "v" + round);
+          update.executeUpdate();
+        }
+      }
+
+      // Set some columns to null via ON DUPLICATE KEY UPDATE — generates 
DeleteColumn cells
+      String dmlNullify =
+        "UPSERT INTO " + tableName + " VALUES(?, 0, '') ON DUPLICATE KEY 
UPDATE counter2 = NULL";
+      PreparedStatement nullify = conn.prepareStatement(dmlNullify);
+      for (int i = 0; i < 5; ++i) {
+        nullify.setString(1, "row_" + i);
+        nullify.executeUpdate();
+      }
+
+      // Replay on cluster 2 and verify cross-cluster cell-level equality
+      replayAndVerifyAcrossClusters(Collections.singletonList(createTableDdl), 
tableName);
+    }
+  }
+
+  /**
+   * Verifies cross-cluster cell-level equality after replay for a table with 
a Conditional TTL
+   * expression. Conditional TTL adds coprocessor cells that get merged into 
the data mutation,
+   * exercising the split-merged-mutation path.
+   */
+  @Test
+  public void testAppendAndSyncConditionalTTL() throws Exception {
+    final String tableName = "T_" + generateUniqueName();
+    String createTableDdl = String.format("create table if not exists %s (id1 
integer not null, "
+      + "id2 integer not null, val1 varchar, val2 varchar, expired boolean "
+      + "constraint pk primary key (id1, id2)) TTL = 'expired = TRUE'", 
tableName);
+
+    try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) 
DriverManager
+      .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+      conn.createStatement().execute(createTableDdl);
+      conn.commit();
+
+      PreparedStatement stmt =
+        conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?, ?)");
+      for (int i = 0; i < 5; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          stmt.setInt(1, i);
+          stmt.setInt(2, j);
+          stmt.setString(3, "val1_" + i + "_" + j);
+          stmt.setString(4, j % 2 == 0 ? "val2_" + i + "_" + j : null);
+          stmt.setBoolean(5, false);
+          stmt.executeUpdate();
+        }
+        conn.commit();
+      }
+
+      // Mark some rows expired
+      PreparedStatement expireStmt = conn
+        .prepareStatement("upsert into " + tableName + " (id1, id2, expired) 
VALUES(?, ?, true)");
+      for (int j = 0; j < 5; ++j) {
+        expireStmt.setInt(1, 0);
+        expireStmt.setInt(2, j);
+        expireStmt.executeUpdate();
+      }
+      conn.commit();
+
+      // Update rows expired — conditional TTL triggers extra CP cells on 
update path
+      PreparedStatement updateStmt =
+        conn.prepareStatement("upsert into " + tableName + " (id1, id2, val1) 
VALUES(?, ?, ?)");
+      for (int j = 0; j < 5; ++j) {
+        updateStmt.setInt(1, 0);
+        updateStmt.setInt(2, j);
+        updateStmt.setString(3, "val11_" + 0 + "_" + j);
+        updateStmt.executeUpdate();
+      }
+      conn.commit();
+
+      // Replay on cluster 2 and verify cross-cluster cell-level equality
+      replayAndVerifyAcrossClusters(Collections.singletonList(createTableDdl), 
tableName);
     }
   }
 
@@ -397,4 +578,66 @@ public class ReplicationLogGroupIT extends HABaseIT {
     assertEquals(1, getCountForTable(systemTables, SYSTEM_CHILD_LINK_NAME));
     assertTrue(getCountForTable(systemTables, SYSTEM_CATALOG_NAME) > 0);
   }
+
+  private List<Path> findLogFiles(Path dir, FileSystem fs) throws IOException {
+    List<Path> files = new ArrayList<>();
+    findLogFilesRecursive(dir, fs, files);
+    return files;
+  }
+
+  private void findLogFilesRecursive(Path dir, FileSystem fs, List<Path> 
files) throws IOException {
+    if (!fs.exists(dir)) {
+      return;
+    }
+    for (FileStatus status : fs.listStatus(dir)) {
+      if (status.isDirectory()) {
+        findLogFilesRecursive(status.getPath(), fs, files);
+      } else if (status.getPath().getName().endsWith(".plog")) {
+        files.add(status.getPath());
+      }
+    }
+  }
+
+  private void assertTablesEqualAcrossClusters(String hbaseTableName) throws 
Exception {
+    TableName tn = TableName.valueOf(hbaseTableName);
+    try (
+      org.apache.hadoop.hbase.client.Connection hconn1 = 
ConnectionFactory.createConnection(conf1);
+      org.apache.hadoop.hbase.client.Connection hconn2 = 
ConnectionFactory.createConnection(conf2);
+      Table table1 = hconn1.getTable(tn); Table table2 = hconn2.getTable(tn)) {
+
+      Scan scan = new Scan();
+      scan.readAllVersions();
+
+      try (ResultScanner scanner1 = table1.getScanner(scan);
+        ResultScanner scanner2 = table2.getScanner(scan)) {
+        int rowCount = 0;
+        while (true) {
+          Result r1 = scanner1.next();
+          Result r2 = scanner2.next();
+          if (r1 == null && r2 == null) {
+            break;
+          }
+          assertNotNull(
+            String.format("Table %s: cluster 2 has fewer rows at row %d", 
hbaseTableName, rowCount),
+            r2);
+          assertNotNull(
+            String.format("Table %s: cluster 1 has fewer rows at row %d", 
hbaseTableName, rowCount),
+            r1);
+          try {
+            Result.compareResults(r1, r2, true);
+          } catch (Exception e) {
+            LOG.error("Table {} row {} mismatch. Dumping both tables:", 
hbaseTableName, rowCount);
+            LOG.error("--- Cluster 1 ---");
+            TestUtil.dumpTable(table1);
+            LOG.error("--- Cluster 2 ---");
+            TestUtil.dumpTable(table2);
+            fail(String.format("Table %s row %d mismatch: %s", hbaseTableName, 
rowCount,
+              e.getMessage()));
+          }
+          rowCount++;
+        }
+        LOG.info("Table {} matches across clusters: {} rows verified", 
hbaseTableName, rowCount);
+      }
+    }
+  }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
index 70049b0883..a3955c5783 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
@@ -31,6 +31,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
@@ -232,16 +235,6 @@ public class LogFileCodecTest {
     singleRecordTest(LogFileTestUtil.newPutRecord("", 1L, "row", 12345L, 1));
   }
 
-  @Test
-  public void testCodecWithEmptyFamily() throws IOException {
-    long ts = 12345L;
-    Put put = new Put(Bytes.toBytes("row"));
-    put.setTimestamp(ts);
-    put.addColumn(HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("q"), ts, 
Bytes.toBytes("v"));
-    singleRecordTest(
-      new 
LogFileRecord().setHBaseTableName("TBLEMPTYFAM").setCommitId(1L).setMutation(put));
-  }
-
   @Test
   public void testCodecWithEmptyQualifier() throws IOException {
     long ts = 12345L;
@@ -401,4 +394,64 @@ public class LogFileCodecTest {
     }
   }
 
+  @Test
+  public void testDeleteWithMixedCellTypes() throws IOException {
+    long ts = 12345L;
+    byte[] row = Bytes.toBytes("row");
+    byte[] cf = Bytes.toBytes("cf");
+    Delete delete = new Delete(row);
+    delete.setTimestamp(ts);
+    
delete.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(cf)
+      
.setQualifier(Bytes.toBytes("q1")).setTimestamp(ts).setType(Cell.Type.DeleteColumn).build());
+    
delete.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(cf)
+      
.setQualifier(HConstants.EMPTY_BYTE_ARRAY).setTimestamp(ts).setType(Cell.Type.DeleteFamily)
+      .build());
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLMIXED").setCommitId(1L).setMutation(delete));
+  }
+
+  @Test
+  public void testCellTypeByteRoundTripForAllTypes() throws IOException {
+    long ts = 12345L;
+    byte[] row = Bytes.toBytes("row");
+    byte[] cf = Bytes.toBytes("cf");
+    byte[] qual = Bytes.toBytes("q");
+
+    // Put cell type
+    Put put = new Put(row);
+    put.setTimestamp(ts);
+    put.addColumn(cf, qual, ts, Bytes.toBytes("v"));
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLCTP").setCommitId(1L).setMutation(put));
+
+    // Delete (single version) cell type
+    Delete del1 = new Delete(row);
+    del1.setTimestamp(ts);
+    del1.addColumn(cf, qual, ts);
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLCTD").setCommitId(1L).setMutation(del1));
+
+    // DeleteColumn (all versions) cell type
+    Delete del2 = new Delete(row);
+    del2.setTimestamp(ts);
+    
del2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(cf)
+      
.setQualifier(qual).setTimestamp(ts).setType(Cell.Type.DeleteColumn).build());
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLCTDC").setCommitId(1L).setMutation(del2));
+
+    // DeleteFamily cell type
+    Delete del3 = new Delete(row);
+    del3.setTimestamp(ts);
+    del3.addFamily(cf, ts);
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLCTDF").setCommitId(1L).setMutation(del3));
+
+    // DeleteFamilyVersion cell type
+    Delete del4 = new Delete(row);
+    del4.setTimestamp(ts);
+    del4.addFamilyVersion(cf, ts);
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLCTDFV").setCommitId(1L).setMutation(del4));
+  }
+
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
index 0044483008..c198a400ba 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
@@ -458,8 +458,7 @@ public class LogFileFormatTest {
       reader.init(readerContext, input);
       fail("Expected InvalidLogTrailerException when trailer is missing");
     } catch (InvalidLogTrailerException e) {
-      assertTrue("Exception message should contain 'Unsupported version'",
-        e.getMessage().contains("Unsupported version"));
+      // Expected — the bytes at the trailer position are block data, not a 
valid trailer
     }
   }
 


Reply via email to