This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 9812297b94 PHOENIX-7801 Preserve cell timestamps between clusters 
(#2410)
9812297b94 is described below

commit 9812297b94093a1eab5c9fd4bd464256f0b3046b
Author: tkhurana <[email protected]>
AuthorDate: Thu Apr 16 10:44:25 2026 -0700

    PHOENIX-7801 Preserve cell timestamps between clusters (#2410)
---
 .../phoenix/replication/log/LogFileCodec.java      | 12 ++-
 .../apache/phoenix/jdbc/PhoenixHAAdminToolIT.java  |  2 +-
 .../phoenix/replication/log/LogFileCodecTest.java  | 91 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 5 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
index 6e7c6c04db..1ce29fd0b6 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.io.WritableUtils;
  *   |   +--------------------------------------+ |
  *   |   | PER-CELL DATA (repeated)             | |
  *   |   |   +––––––––––––----------------–--–+ | |
+ *   |   |   | CELL TIMESTAMP (long)          | | |
  *   |   |   | COLUMN QUALIFIER LENGTH (vint) | | |
  *   |   |   | COLUMN QUALIFIER (byte[])      | | |
  *   |   |   | VALUE LENGTH (vint)            | | |
@@ -139,6 +140,7 @@ public class LogFileCodec implements LogFile.Codec {
         List<Cell> cells = entry.getValue();
         WritableUtils.writeVInt(recordOut, cells.size());
         for (Cell cell : cells) {
+          recordOut.writeLong(cell.getTimestamp());
           WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
           recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
             cell.getQualifierLength());
@@ -225,6 +227,8 @@ public class LogFileCodec implements LogFile.Codec {
           // Qualifiers+Values Count
           int columnValuePairsCount = WritableUtils.readVInt(in);
           for (int j = 0; j < columnValuePairsCount; j++) {
+            // Cell timestamp
+            long cellTs = in.readLong();
             // Qualifier name
             int qualLen = WritableUtils.readVInt(in);
             byte[] qual = new byte[qualLen];
@@ -239,17 +243,17 @@ public class LogFileCodec implements LogFile.Codec {
             }
             switch (type) {
               case PUT:
-                ((Put) mutation).addColumn(cf, qual, ts, value);
+                ((Put) mutation).addColumn(cf, qual, cellTs, value);
                 break;
               case DELETE:
               case DELETECOLUMN:
-                ((Delete) mutation).addColumn(cf, qual, ts);
+                ((Delete) mutation).addColumn(cf, qual, cellTs);
                 break;
               case DELETEFAMILYVERSION:
-                ((Delete) mutation).addFamilyVersion(cf, ts);
+                ((Delete) mutation).addFamilyVersion(cf, cellTs);
                 break;
               case DELETEFAMILY:
-                ((Delete) mutation).addFamily(cf);
+                ((Delete) mutation).addFamily(cf, cellTs);
                 break;
               default:
                 throw new UnsupportedOperationException("Unhandled mutation 
type " + type);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
index 7ff79f2c7f..ca60771c80 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
@@ -911,7 +911,7 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
     int ret = ToolRunner.run(adminTool,
       new String[] { "create", "-g", createHaGroupName, "-p", "FAILOVER", 
"-zk1",
         CLUSTERS.getZkUrl1(), "-c1", CLUSTERS.getMasterAddress1(), "-cr1", 
"ACTIVE", "-zk2",
-        CLUSTERS.getZkUrl2(), "-c2", CLUSTERS.getMasterAddress2(), "-cr2", 
"STANDBY", "-hdfs1", 
+        CLUSTERS.getZkUrl2(), "-c2", CLUSTERS.getMasterAddress2(), "-cr2", 
"STANDBY", "-hdfs1",
         CLUSTERS.getHdfsUrl1(), "-hdfs2", CLUSTERS.getHdfsUrl2() });
 
     assertEquals("create command should succeed", RET_SUCCESS, ret);
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
index 5df12e1ad8..70049b0883 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
@@ -310,4 +310,95 @@ public class LogFileCodecTest {
       new 
LogFileRecord().setHBaseTableName("TBLLVAL").setCommitId(1L).setMutation(put));
   }
 
+  // Cell timestamp preservation tests
+  // These verify that per-cell timestamps survive a codec round-trip when 
they differ from the
+  // mutation-level timestamp. Before the fix the encoder omitted 
cell.getTimestamp() entirely
+  // and the decoder fell back to the mutation-level timestamp (or 
HConstants.LATEST_TIMESTAMP
+  // for addFamily), so any divergence produced wrong timestamps on the 
standby cluster.
+
+  @Test
+  public void testPutCellTimestampsDifferFromMutationTimestamp() throws 
IOException {
+    long mutationTs = 99999L;
+    Put put = new Put(Bytes.toBytes("row"));
+    put.setTimestamp(mutationTs);
+    // Each cell gets its own timestamp, all different from mutationTs and 
from each other
+    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), 11111L, 
Bytes.toBytes("v1"));
+    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q2"), 22222L, 
Bytes.toBytes("v2"));
+    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q3"), 33333L, 
Bytes.toBytes("v3"));
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLPUTTS").setCommitId(1L).setMutation(put));
+  }
+
+  @Test
+  public void testDeleteColumnCellTimestampDiffersFromMutationTimestamp() 
throws IOException {
+    long mutationTs = 99999L;
+    long cellTs = 11111L;
+    Delete delete = new Delete(Bytes.toBytes("row"));
+    delete.setTimestamp(mutationTs);
+    delete.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q"), cellTs);
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLDELCOLTS").setCommitId(1L).setMutation(delete));
+  }
+
+  @Test
+  public void testDeleteFamilyCellTimestampDiffersFromMutationTimestamp() 
throws IOException {
+    // This is the most direct regression test for the bug: addFamily(cf) was 
called without ts,
+    // defaulting to HConstants.LATEST_TIMESTAMP instead of preserving the 
original cell timestamp.
+    long mutationTs = 99999L;
+    long cellTs = 11111L;
+    Delete delete = new Delete(Bytes.toBytes("row"));
+    delete.setTimestamp(mutationTs);
+    delete.addFamily(Bytes.toBytes("cf"), cellTs); // explicit cell ts != 
mutationTs
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLDELFAMTS").setCommitId(1L).setMutation(delete));
+  }
+
+  @Test
+  public void 
testDeleteFamilyVersionCellTimestampDiffersFromMutationTimestamp()
+    throws IOException {
+    long mutationTs = 99999L;
+    long cellTs = 11111L;
+    Delete delete = new Delete(Bytes.toBytes("row"));
+    delete.setTimestamp(mutationTs);
+    delete.addFamilyVersion(Bytes.toBytes("cf"), cellTs); // explicit cell ts 
!= mutationTs
+    singleRecordTest(
+      new 
LogFileRecord().setHBaseTableName("TBLDELFAMVERTS").setCommitId(1L).setMutation(delete));
+  }
+
+  @Test
+  public void testMultipleCellsWithDistinctTimestampsPreserved() throws 
IOException {
+    // Multiple cells in the same mutation each carry a unique timestamp; all 
must survive
+    // the round-trip intact.
+    long mutationTs = 50000L;
+    Put put = new Put(Bytes.toBytes("row"));
+    put.setTimestamp(mutationTs);
+    for (int i = 0; i < 10; i++) {
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q" + i), (long) (i * 
1000),
+        Bytes.toBytes("v" + i));
+    }
+    LogFileCodec codec = new LogFileCodec();
+    LogFile.Record original =
+      new 
LogFileRecord().setHBaseTableName("TBLDISTINCT").setCommitId(1L).setMutation(put);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    codec.getEncoder(new DataOutputStream(baos)).write(original);
+    LogFile.Codec.Decoder decoder =
+      codec.getDecoder(new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray())));
+
+    assertTrue(decoder.advance());
+    LogFile.Record decoded = decoder.current();
+    LogFileTestUtil.assertRecordEquals("All per-cell timestamps must be 
preserved", original,
+      decoded);
+    // Also verify each cell timestamp explicitly
+    java.util.List<org.apache.hadoop.hbase.Cell> originalCells =
+      put.getFamilyCellMap().get(Bytes.toBytes("cf"));
+    java.util.List<org.apache.hadoop.hbase.Cell> decodedCells =
+      decoded.getMutation().getFamilyCellMap().get(Bytes.toBytes("cf"));
+    assertEquals("Cell count must match", originalCells.size(), 
decodedCells.size());
+    for (int i = 0; i < originalCells.size(); i++) {
+      assertEquals("Cell " + i + " timestamp must be preserved",
+        originalCells.get(i).getTimestamp(), 
decodedCells.get(i).getTimestamp());
+    }
+  }
+
 }

Reply via email to