This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 9812297b94 PHOENIX-7801 Preserve cell timestamps between clusters
(#2410)
9812297b94 is described below
commit 9812297b94093a1eab5c9fd4bd464256f0b3046b
Author: tkhurana <[email protected]>
AuthorDate: Thu Apr 16 10:44:25 2026 -0700
PHOENIX-7801 Preserve cell timestamps between clusters (#2410)
---
.../phoenix/replication/log/LogFileCodec.java | 12 ++-
.../apache/phoenix/jdbc/PhoenixHAAdminToolIT.java | 2 +-
.../phoenix/replication/log/LogFileCodecTest.java | 91 ++++++++++++++++++++++
3 files changed, 100 insertions(+), 5 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
index 6e7c6c04db..1ce29fd0b6 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.io.WritableUtils;
* | +--------------------------------------+ |
* | | PER-CELL DATA (repeated) | |
* | | +––––––––––––----------------–--–+ | |
+ * | | | CELL TIMESTAMP (long) | | |
* | | | COLUMN QUALIFIER LENGTH (vint) | | |
* | | | COLUMN QUALIFIER (byte[]) | | |
* | | | VALUE LENGTH (vint) | | |
@@ -139,6 +140,7 @@ public class LogFileCodec implements LogFile.Codec {
List<Cell> cells = entry.getValue();
WritableUtils.writeVInt(recordOut, cells.size());
for (Cell cell : cells) {
+ recordOut.writeLong(cell.getTimestamp());
WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
@@ -225,6 +227,8 @@ public class LogFileCodec implements LogFile.Codec {
// Qualifiers+Values Count
int columnValuePairsCount = WritableUtils.readVInt(in);
for (int j = 0; j < columnValuePairsCount; j++) {
+ // Cell timestamp
+ long cellTs = in.readLong();
// Qualifier name
int qualLen = WritableUtils.readVInt(in);
byte[] qual = new byte[qualLen];
@@ -239,17 +243,17 @@ public class LogFileCodec implements LogFile.Codec {
}
switch (type) {
case PUT:
- ((Put) mutation).addColumn(cf, qual, ts, value);
+ ((Put) mutation).addColumn(cf, qual, cellTs, value);
break;
case DELETE:
case DELETECOLUMN:
- ((Delete) mutation).addColumn(cf, qual, ts);
+ ((Delete) mutation).addColumn(cf, qual, cellTs);
break;
case DELETEFAMILYVERSION:
- ((Delete) mutation).addFamilyVersion(cf, ts);
+ ((Delete) mutation).addFamilyVersion(cf, cellTs);
break;
case DELETEFAMILY:
- ((Delete) mutation).addFamily(cf);
+ ((Delete) mutation).addFamily(cf, cellTs);
break;
default:
throw new UnsupportedOperationException("Unhandled mutation
type " + type);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
index 7ff79f2c7f..ca60771c80 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolIT.java
@@ -911,7 +911,7 @@ public class PhoenixHAAdminToolIT extends HABaseIT {
int ret = ToolRunner.run(adminTool,
new String[] { "create", "-g", createHaGroupName, "-p", "FAILOVER",
"-zk1",
CLUSTERS.getZkUrl1(), "-c1", CLUSTERS.getMasterAddress1(), "-cr1",
"ACTIVE", "-zk2",
- CLUSTERS.getZkUrl2(), "-c2", CLUSTERS.getMasterAddress2(), "-cr2",
"STANDBY", "-hdfs1",
+ CLUSTERS.getZkUrl2(), "-c2", CLUSTERS.getMasterAddress2(), "-cr2",
"STANDBY", "-hdfs1",
CLUSTERS.getHdfsUrl1(), "-hdfs2", CLUSTERS.getHdfsUrl2() });
assertEquals("create command should succeed", RET_SUCCESS, ret);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
index 5df12e1ad8..70049b0883 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
@@ -310,4 +310,95 @@ public class LogFileCodecTest {
new
LogFileRecord().setHBaseTableName("TBLLVAL").setCommitId(1L).setMutation(put));
}
+ // Cell timestamp preservation tests
+ // These verify that per-cell timestamps survive a codec round-trip when
they differ from the
+ // mutation-level timestamp. Before the fix the encoder omitted
cell.getTimestamp() entirely
+ // and the decoder fell back to the mutation-level timestamp (or
HConstants.LATEST_TIMESTAMP
+ // for addFamily), so any divergence produced wrong timestamps on the
standby cluster.
+
+ @Test
+ public void testPutCellTimestampsDifferFromMutationTimestamp() throws
IOException {
+ long mutationTs = 99999L;
+ Put put = new Put(Bytes.toBytes("row"));
+ put.setTimestamp(mutationTs);
+ // Each cell gets its own timestamp, all different from mutationTs and
from each other
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), 11111L,
Bytes.toBytes("v1"));
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q2"), 22222L,
Bytes.toBytes("v2"));
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q3"), 33333L,
Bytes.toBytes("v3"));
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLPUTTS").setCommitId(1L).setMutation(put));
+ }
+
+ @Test
+ public void testDeleteColumnCellTimestampDiffersFromMutationTimestamp()
throws IOException {
+ long mutationTs = 99999L;
+ long cellTs = 11111L;
+ Delete delete = new Delete(Bytes.toBytes("row"));
+ delete.setTimestamp(mutationTs);
+ delete.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q"), cellTs);
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLDELCOLTS").setCommitId(1L).setMutation(delete));
+ }
+
+ @Test
+ public void testDeleteFamilyCellTimestampDiffersFromMutationTimestamp()
throws IOException {
+ // This is the most direct regression test for the bug: addFamily(cf) was
called without ts,
+ // defaulting to HConstants.LATEST_TIMESTAMP instead of preserving the
original cell timestamp.
+ long mutationTs = 99999L;
+ long cellTs = 11111L;
+ Delete delete = new Delete(Bytes.toBytes("row"));
+ delete.setTimestamp(mutationTs);
+ delete.addFamily(Bytes.toBytes("cf"), cellTs); // explicit cell ts !=
mutationTs
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLDELFAMTS").setCommitId(1L).setMutation(delete));
+ }
+
+ @Test
+ public void
testDeleteFamilyVersionCellTimestampDiffersFromMutationTimestamp()
+ throws IOException {
+ long mutationTs = 99999L;
+ long cellTs = 11111L;
+ Delete delete = new Delete(Bytes.toBytes("row"));
+ delete.setTimestamp(mutationTs);
+ delete.addFamilyVersion(Bytes.toBytes("cf"), cellTs); // explicit cell ts
!= mutationTs
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLDELFAMVERTS").setCommitId(1L).setMutation(delete));
+ }
+
+ @Test
+ public void testMultipleCellsWithDistinctTimestampsPreserved() throws
IOException {
+ // Multiple cells in the same mutation each carry a unique timestamp; all
must survive
+ // the round-trip intact.
+ long mutationTs = 50000L;
+ Put put = new Put(Bytes.toBytes("row"));
+ put.setTimestamp(mutationTs);
+ for (int i = 0; i < 10; i++) {
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q" + i), (long) (i *
1000),
+ Bytes.toBytes("v" + i));
+ }
+ LogFileCodec codec = new LogFileCodec();
+ LogFile.Record original =
+ new
LogFileRecord().setHBaseTableName("TBLDISTINCT").setCommitId(1L).setMutation(put);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ codec.getEncoder(new DataOutputStream(baos)).write(original);
+ LogFile.Codec.Decoder decoder =
+ codec.getDecoder(new DataInputStream(new
ByteArrayInputStream(baos.toByteArray())));
+
+ assertTrue(decoder.advance());
+ LogFile.Record decoded = decoder.current();
+ LogFileTestUtil.assertRecordEquals("All per-cell timestamps must be
preserved", original,
+ decoded);
+ // Also verify each cell timestamp explicitly
+ java.util.List<org.apache.hadoop.hbase.Cell> originalCells =
+ put.getFamilyCellMap().get(Bytes.toBytes("cf"));
+ java.util.List<org.apache.hadoop.hbase.Cell> decodedCells =
+ decoded.getMutation().getFamilyCellMap().get(Bytes.toBytes("cf"));
+ assertEquals("Cell count must match", originalCells.size(),
decodedCells.size());
+ for (int i = 0; i < originalCells.size(); i++) {
+ assertEquals("Cell " + i + " timestamp must be preserved",
+ originalCells.get(i).getTimestamp(),
decodedCells.get(i).getTimestamp());
+ }
+ }
+
}