This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 71034cb2a9 PHOENIX-7860 Fix cell serialization and deserialization
when the cells from mutations from CP are merged (#2483)
71034cb2a9 is described below
commit 71034cb2a9c098603a002340b8879c69be5f5ef6
Author: tkhurana <[email protected]>
AuthorDate: Wed May 20 15:46:54 2026 -0700
PHOENIX-7860 Fix cell serialization and deserialization when the cells from
mutations from CP are merged (#2483)
---
.../phoenix/hbase/index/IndexRegionObserver.java | 116 ++++----
.../phoenix/replication/log/LogFileCodec.java | 30 +-
.../phoenix/replication/log/LogFileRecord.java | 39 +--
.../phoenix/replication/ReplicationLogGroupIT.java | 305 ++++++++++++++++++---
.../phoenix/replication/log/LogFileCodecTest.java | 73 ++++-
.../phoenix/replication/log/LogFileFormatTest.java | 3 +-
6 files changed, 422 insertions(+), 144 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b968914731..01f7ee898b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -160,6 +160,7 @@ import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -756,51 +757,22 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
*/
private void replicateEditOnWALRestore(ReplicationLogGroup logGroup, WALKey
logKey,
WALEdit logEdit) throws IOException {
- ImmutableBytesPtr prevKey = null, currentKey = null;
- Put put = null;
- Delete del = null;
+ List<Cell> regularCells = new ArrayList<>();
for (Cell kv : logEdit.getCells()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
logGroup.append(Bytes.toString(ikv.getIndexTable()), -1,
ikv.getMutation());
} else {
- // While we can generate a separate mutation for every cell that is
part of the
- // WAL edit and replicate each such mutation. Doing that will not be
very efficient
- // since a mutation can have large number of cells. Instead, we first
group the
- // cells belonging to the same row into a mutation and then replicate
that
- // mutation.
- currentKey = new ImmutableBytesPtr(kv.getRowArray(),
kv.getRowOffset(), kv.getRowLength());
- if (!currentKey.equals(prevKey)) {
- if (put != null && !this.ignoreReplicationFilter.test(put)) {
- logGroup.append(logKey.getTableName().getNameAsString(), -1, put);
- }
- if (del != null && !this.ignoreReplicationFilter.test(del)) {
- logGroup.append(logKey.getTableName().getNameAsString(), -1, del);
- }
- // reset
- put = null;
- del = null;
- }
- if (kv.getType() == Cell.Type.Put) {
- if (put == null) {
- put = new Put(currentKey.get(), currentKey.getOffset(),
currentKey.getLength());
- }
- put.add(kv);
- } else {
- if (del == null) {
- del = new Delete(currentKey.get(), currentKey.getOffset(),
currentKey.getLength());
- }
- del.add(kv);
- }
- prevKey = currentKey;
+ regularCells.add(kv);
}
}
- // append the last one
- if (put != null && !this.ignoreReplicationFilter.test(put)) {
- logGroup.append(logKey.getTableName().getNameAsString(), -1, put);
- }
- if (del != null && !this.ignoreReplicationFilter.test(del)) {
- logGroup.append(logKey.getTableName().getNameAsString(), -1, del);
+ if (!regularCells.isEmpty()) {
+ String tableName = logKey.getTableName().getNameAsString();
+ for (Mutation split : splitCellsIntoMutations(regularCells)) {
+ if (!this.ignoreReplicationFilter.test(split)) {
+ logGroup.append(tableName, -1, split);
+ }
+ }
}
logGroup.sync();
}
@@ -2632,6 +2604,52 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return status.getOperationStatusCode() == SUCCESS && status.getResult() !=
null;
}
+ /**
+ * Splits cells into individual Put/Delete mutations grouped by (row key,
put-vs-delete). HBase's
+ * checkAndMergeCPMutations merges coprocessor cells into the data mutation,
so a single Put may
+ * contain Delete cells with different row keys (e.g., local index). This
method recovers distinct
+ * mutations using the same grouping algorithm as HBase's ReplicationSink.
+ */
+ private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
+ return previousCell == null || previousCell.getType() != cell.getType()
+ || !CellUtil.matchingRows(previousCell, cell);
+ }
+
+ static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) throws
IOException {
+ List<Mutation> result = new ArrayList<>();
+ Cell previousCell = null;
+ Mutation current = null;
+ for (Cell cell : cells) {
+ if (isNewRowOrType(previousCell, cell)) {
+ if (current != null) {
+ result.add(current);
+ }
+ if (CellUtil.isDelete(cell)) {
+ current = new Delete(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
+ } else {
+ current = new Put(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
+ }
+ }
+ if (CellUtil.isDelete(cell)) {
+ ((Delete) current).add(cell);
+ } else {
+ ((Put) current).add(cell);
+ }
+ previousCell = cell;
+ }
+ if (current != null) {
+ result.add(current);
+ }
+ return result;
+ }
+
+ static List<Mutation> splitCellsIntoMutations(Mutation merged) throws
IOException {
+ if (merged.isEmpty()) {
+ return Collections.singletonList(merged);
+ }
+ return
splitCellsIntoMutations(Iterables.concat(merged.getFamilyCellMap().values()));
+ }
+
private void replicateMutations(RegionCoprocessorEnvironment env,
MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext
context)
throws IOException {
@@ -2647,17 +2665,21 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (!logGroup.isPresent()) {
return;
}
+ ReplicationLogGroup group = logGroup.get();
- for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (this.ignoreReplicationFilter.test(m)) {
continue;
}
- logGroup.get().append(this.dataTableName, -1, m);
- Mutation[] mutationsAddedByCP =
miniBatchOp.getOperationsFromCoprocessors(i);
- if (mutationsAddedByCP != null) {
- for (Mutation addedMutation : mutationsAddedByCP) {
- logGroup.get().append(this.dataTableName, -1, addedMutation);
+ // When coprocessors add cells (local index, conditional TTL, ON
DUPLICATE KEY UPDATE),
+ // HBase merges them into the data mutation which can mix row keys and
cell types.
+ // Split those back into individual Put/Delete mutations for correct
serialization.
+ if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
+ group.append(this.dataTableName, -1, m);
+ } else {
+ for (Mutation split : splitCellsIntoMutations(m)) {
+ group.append(this.dataTableName, -1, split);
}
}
}
@@ -2667,7 +2689,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (this.ignoreReplicationFilter.test(entry.getValue())) {
continue;
}
- logGroup.get().append(entry.getKey().getTableName(), -1,
entry.getValue());
+ group.append(entry.getKey().getTableName(), -1, entry.getValue());
}
}
if (context.postIndexUpdates != null) {
@@ -2676,9 +2698,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (this.ignoreReplicationFilter.test(entry.getValue())) {
continue;
}
- logGroup.get().append(entry.getKey().getTableName(), -1,
entry.getValue());
+ group.append(entry.getKey().getTableName(), -1, entry.getValue());
}
}
- logGroup.get().sync();
+ group.sync();
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
index 1ce29fd0b6..a0975287d3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -68,6 +69,7 @@ import org.apache.hadoop.io.WritableUtils;
* | | PER-CELL DATA (repeated) | |
* | | +––––––––––––----------------–--–+ | |
* | | | CELL TIMESTAMP (long) | | |
+ * | | | CELL TYPE (byte) | | |
* | | | COLUMN QUALIFIER LENGTH (vint) | | |
* | | | COLUMN QUALIFIER (byte[]) | | |
* | | | VALUE LENGTH (vint) | | |
@@ -141,6 +143,7 @@ public class LogFileCodec implements LogFile.Codec {
WritableUtils.writeVInt(recordOut, cells.size());
for (Cell cell : cells) {
recordOut.writeLong(cell.getTimestamp());
+ recordOut.writeByte(cell.getTypeByte());
WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
@@ -205,9 +208,6 @@ public class LogFileCodec implements LogFile.Codec {
mutation = new Put(rowKey);
break;
case DELETE:
- case DELETEFAMILYVERSION:
- case DELETECOLUMN:
- case DELETEFAMILY:
mutation = new Delete(rowKey);
break;
default:
@@ -229,6 +229,8 @@ public class LogFileCodec implements LogFile.Codec {
for (int j = 0; j < columnValuePairsCount; j++) {
// Cell timestamp
long cellTs = in.readLong();
+ // Cell type byte
+ byte cellTypeByte = in.readByte();
// Qualifier name
int qualLen = WritableUtils.readVInt(in);
byte[] qual = new byte[qualLen];
@@ -241,22 +243,12 @@ public class LogFileCodec implements LogFile.Codec {
if (valueLen > 0) {
in.readFully(value);
}
- switch (type) {
- case PUT:
- ((Put) mutation).addColumn(cf, qual, cellTs, value);
- break;
- case DELETE:
- case DELETECOLUMN:
- ((Delete) mutation).addColumn(cf, qual, cellTs);
- break;
- case DELETEFAMILYVERSION:
- ((Delete) mutation).addFamilyVersion(cf, cellTs);
- break;
- case DELETEFAMILY:
- ((Delete) mutation).addFamily(cf, cellTs);
- break;
- default:
- throw new UnsupportedOperationException("Unhandled mutation
type " + type);
+ Cell cell = new KeyValue(rowKey, 0, rowKey.length, cf, 0,
cf.length, qual, 0,
+ qual.length, cellTs, KeyValue.Type.codeToType(cellTypeByte),
value, 0, value.length);
+ if (mutation instanceof Put) {
+ ((Put) mutation).add(cell);
+ } else {
+ ((Delete) mutation).add(cell);
}
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
index 4b62ccfa7d..6b12a3e659 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
@@ -17,9 +17,6 @@
*/
package org.apache.phoenix.replication.log;
-import java.io.IOException;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -114,16 +111,9 @@ public class LogFileRecord implements LogFile.Record {
// Internals only below. Not for LogFile interface consumer use.
- /**
- * The Phoenix concept of HBase mutation type, which is currently a 1:1
mapping with HBase's, with
- * different code values (they don't need to match), but may potentially
diverge in the future.
- */
protected enum MutationType {
PUT(1),
- DELETE(2),
- DELETEFAMILYVERSION(3),
- DELETECOLUMN(4),
- DELETEFAMILY(5);
+ DELETE(2);
private int code;
@@ -135,33 +125,12 @@ public class LogFileRecord implements LogFile.Record {
return code;
}
- static MutationType get(Mutation mutation) throws IOException {
+ static MutationType get(Mutation mutation) {
if (mutation instanceof Put) {
return PUT;
} else if (mutation instanceof Delete) {
- CellScanner s = mutation.cellScanner();
- if (!s.advance()) {
- // No cell in delete. A simple delete of a row.
- return DELETE;
- }
- // This assumes that either there is only one cell in the Delete, or
all cells in
- // the delete have the same cell type, which is correct as of today.
We only need
- // to look at the first.
- Cell cell = s.current();
- switch (cell.getType()) {
- case Delete:
- return DELETE;
- case DeleteFamilyVersion:
- return DELETEFAMILYVERSION;
- case DeleteColumn:
- return DELETECOLUMN;
- case DeleteFamily:
- return DELETEFAMILY;
- default:
- // Fall through to throw the UnsupportedOperationException
- break;
- }
- } // Fall through to throw the UnsupportedOperationException
+ return DELETE;
+ }
throw new UnsupportedOperationException("Unsupported mutation type: " +
mutation);
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index 651cddde6c..c0172274c7 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -24,23 +24,35 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_
import static org.apache.phoenix.query.BaseTest.generateUniqueName;
import static
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -55,6 +67,7 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.PhoenixTestBuilder;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.replication.reader.ReplicationLogProcessor;
import org.apache.phoenix.replication.tool.LogFileAnalyzer;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
@@ -209,25 +222,60 @@ public class ReplicationLogGroupIT extends HABaseIT {
return schemaBuilder;
}
+ private void replayAndVerifyAcrossClusters(List<String> ddlStatements,
String... tablesToVerify)
+ throws Exception {
+ Path standByLogDir =
logGroup.getOrCreatePeerShardManager().getRootDirectoryPath();
+
+ // Create the same schema on cluster 2
+ try (Connection conn2 = CLUSTERS.getCluster2Connection(haGroup)) {
+ for (String ddl : ddlStatements) {
+ conn2.createStatement().execute(ddl);
+ }
+ conn2.commit();
+ }
+
+ // Replay replication log on cluster 2
+ FileSystem fs = standByLogDir.getFileSystem(conf2);
+ List<Path> logFiles = findLogFiles(standByLogDir, fs);
+ assertTrue("Should have at least one log file", !logFiles.isEmpty());
+ ReplicationLogProcessor processor = ReplicationLogProcessor.get(conf2,
haGroupName);
+ try {
+ for (Path logFile : logFiles) {
+ LOG.info("Replaying log file: {}", logFile);
+ processor.processLogFile(fs, logFile);
+ }
+ } finally {
+ processor.close();
+ }
+
+ // Verify tables match across clusters at the HBase cell level
+ for (String table : tablesToVerify) {
+ assertTablesEqualAcrossClusters(table);
+ }
+ }
+
@Test
public void testAppendAndSync() throws Exception {
final String tableName = "T_" + generateUniqueName();
final String indexName1 = "I_" + generateUniqueName();
final String indexName2 = "I_" + generateUniqueName();
final String indexName3 = "L_" + generateUniqueName();
+ String createTableDdl = String.format("create table if not exists %s (id1
integer not null, "
+ + "id2 integer not null, val1 varchar, val2 varchar "
+ + "constraint pk primary key (id1, id2))", tableName);
+ String createIndex1Ddl = String
+ .format("create index if not exists %s on %s (val1) include (val2)",
indexName1, tableName);
+ String createIndex2Ddl = String
+ .format("create index if not exists %s on %s (val2) include (val1)",
indexName2, tableName);
+ String createLocalIndexDdl = String.format(
+ "create local index if not exists %s on %s (id2,val1) include (val2)",
indexName3, tableName);
+
try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection)
DriverManager
.getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
- String ddl = String.format("create table %s (id1 integer not null, "
- + "id2 integer not null, val1 varchar, val2 varchar "
- + "constraint pk primary key (id1, id2))", tableName);
- conn.createStatement().execute(ddl);
- ddl = String.format("create index %s on %s (val1) include (val2)",
indexName1, tableName);
- conn.createStatement().execute(ddl);
- ddl = String.format("create index %s on %s (val2) include (val1)",
indexName2, tableName);
- conn.createStatement().execute(ddl);
- ddl = String.format("create local index %s on %s (id2,val1) include
(val2)", indexName3,
- tableName);
- conn.createStatement().execute(ddl);
+ conn.createStatement().execute(createTableDdl);
+ conn.createStatement().execute(createIndex1Ddl);
+ conn.createStatement().execute(createIndex2Ddl);
+ conn.createStatement().execute(createLocalIndexDdl);
conn.commit();
PreparedStatement stmt =
conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?)");
@@ -255,38 +303,43 @@ public class ReplicationLogGroupIT extends HABaseIT {
assertEquals(0, stmt.executeUpdate());
}
}
- // verify the correctness of the index
- // TODO Index tool test API doesn't work with Failover connection
- // IndexToolIT.verifyIndexTable(conf1, tableName, indexName1, conn);
- // verify replication
- Map<String, Integer> expected = Maps.newHashMap();
+
+ // verify replication mutation counts
// mutation count will be equal to row count since the atomic upsert
mutations will be
// ignored and therefore not replicated
+ Map<String, Integer> expected = Maps.newHashMap();
expected.put(tableName, rowCount * 3); // Put + Delete + local index
update
- // for index1 unverified + verified + delete (Delete column)
- expected.put(indexName1, rowCount * 3);
- // for index2 unverified + verified since the null column is part of row
key
- expected.put(indexName2, rowCount * 2);
- // we didn't create any tenant views so no change in the syscat entries
+ expected.put(indexName1, rowCount * 3); // unverified + verified +
delete (DeleteColumn)
+ expected.put(indexName2, rowCount * 2); // unverified + verified
expected.put(SYSTEM_CATALOG_NAME, 0);
expected.put(SYSTEM_CHILD_LINK_NAME, 0);
verifyReplication(expected);
+
+ // Replay on cluster 2 and verify cross-cluster cell-level equality
+ replayAndVerifyAcrossClusters(
+ Arrays.asList(createTableDdl, createIndex1Ddl, createIndex2Ddl,
createLocalIndexDdl),
+ tableName, indexName1, indexName2);
}
}
@Test
public void testAppendAndSyncNoIndex() throws Exception {
final String tableName = "T_" + generateUniqueName();
+ // Multiple column families: cf1 and cf2
+ String createTableDdl = String.format(
+ "create table if not exists %s (id1 integer not null, "
+ + "id2 integer not null, cf1.val1 varchar, cf1.val2 varchar, "
+ + "cf2.val3 varchar, cf2.val4 integer " + "constraint pk primary key
(id1, id2))",
+ tableName);
+
try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection)
DriverManager
.getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
- String ddl = String.format("create table %s (id1 integer not null, "
- + "id2 integer not null, val1 varchar, val2 varchar "
- + "constraint pk primary key (id1, id2))", tableName);
- conn.createStatement().execute(ddl);
+ conn.createStatement().execute(createTableDdl);
conn.commit();
+
+ // upsert 50 rows across multiple column families
PreparedStatement stmt =
- conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?)");
- // upsert 50 rows
+ conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?, ?, ?)");
int rowCount = 50;
for (int i = 0; i < 5; ++i) {
for (int j = 0; j < 10; ++j) {
@@ -294,16 +347,144 @@ public class ReplicationLogGroupIT extends HABaseIT {
stmt.setInt(2, j);
stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
stmt.setString(4, null);
+ stmt.setString(5, "val3_" + i + "_" + j);
+ stmt.setInt(6, i * 10 + j);
stmt.executeUpdate();
}
conn.commit();
}
- // verify replication
+
+ // Delete some rows
+ PreparedStatement deleteStmt =
+ conn.prepareStatement("delete from " + tableName + " where id1 = ? and
id2 = ?");
+ int deleteCount = 10;
+ for (int j = 0; j < 10; ++j) {
+ deleteStmt.setInt(1, 0);
+ deleteStmt.setInt(2, j);
+ deleteStmt.executeUpdate();
+ }
+ conn.commit();
+
+ // verify replication mutation counts
Map<String, Integer> expected = Maps.newHashMap();
- // mutation count will be equal to row count since the atomic upsert
mutations will be
- // ignored and therefore not replicated
- expected.put(tableName, rowCount * 2); // Put + Delete
+ // Each upsert produces Put + Delete (for null columns), row deletes
produce DeleteFamily
+ expected.put(tableName, rowCount * 2 + deleteCount);
verifyReplication(expected);
+
+ // Replay on cluster 2 and verify cross-cluster cell-level equality
+ replayAndVerifyAcrossClusters(Collections.singletonList(createTableDdl),
tableName);
+ }
+ }
+
+ /**
+ * Verifies cross-cluster cell-level equality after replay when ON DUPLICATE
KEY UPDATE rewrites a
+ * row. The atomic upsert path produces a Put (and optionally a Delete with
DeleteColumn cells)
+ * that flow through the coprocessor merge path the codec must round-trip
correctly.
+ */
+ @Test
+ public void testAppendAndSyncOnDuplicateKeyUpdate() throws Exception {
+ final String tableName = "T_" + generateUniqueName();
+ String createTableDdl = String.format("create table if not exists %s "
+ + "(pk varchar primary key, counter1 bigint, counter2 varchar)",
tableName);
+
+ try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection)
DriverManager
+ .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+ conn.createStatement().execute(createTableDdl);
+ conn.commit();
+
+ // Initial inserts for 5 distinct rows
+ PreparedStatement insert =
+ conn.prepareStatement("upsert into " + tableName + " VALUES(?, 0,
'init')");
+ for (int i = 0; i < 5; ++i) {
+ insert.setString(1, "row_" + i);
+ insert.executeUpdate();
+ }
+ conn.commit();
+
+ // ON DUPLICATE KEY UPDATE — increment counter1 and update counter2 a
few times per row.
+ // Each invocation against an existing row triggers the atomic upsert
path which generates
+ // Put (and possibly Delete) mutations on the server side and merges CP
cells.
+ String dml = "UPSERT INTO " + tableName + " VALUES(?, 0, ?) "
+ + "ON DUPLICATE KEY UPDATE counter1 = counter1 + 1, counter2 = ?";
+ PreparedStatement update = conn.prepareStatement(dml);
+ conn.setAutoCommit(true);
+ for (int round = 0; round < 3; ++round) {
+ for (int i = 0; i < 5; ++i) {
+ update.setString(1, "row_" + i);
+ update.setString(2, "v" + round);
+ update.setString(3, "v" + round);
+ update.executeUpdate();
+ }
+ }
+
+ // Set some columns to null via ON DUPLICATE KEY UPDATE — generates
DeleteColumn cells
+ String dmlNullify =
+ "UPSERT INTO " + tableName + " VALUES(?, 0, '') ON DUPLICATE KEY
UPDATE counter2 = NULL";
+ PreparedStatement nullify = conn.prepareStatement(dmlNullify);
+ for (int i = 0; i < 5; ++i) {
+ nullify.setString(1, "row_" + i);
+ nullify.executeUpdate();
+ }
+
+ // Replay on cluster 2 and verify cross-cluster cell-level equality
+ replayAndVerifyAcrossClusters(Collections.singletonList(createTableDdl),
tableName);
+ }
+ }
+
+ /**
+ * Verifies cross-cluster cell-level equality after replay for a table with
a Conditional TTL
+ * expression. Conditional TTL adds coprocessor cells that get merged into
the data mutation,
+ * exercising the split-merged-mutation path.
+ */
+ @Test
+ public void testAppendAndSyncConditionalTTL() throws Exception {
+ final String tableName = "T_" + generateUniqueName();
+ String createTableDdl = String.format("create table if not exists %s (id1
integer not null, "
+ + "id2 integer not null, val1 varchar, val2 varchar, expired boolean "
+ + "constraint pk primary key (id1, id2)) TTL = 'expired = TRUE'",
tableName);
+
+ try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection)
DriverManager
+ .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+ conn.createStatement().execute(createTableDdl);
+ conn.commit();
+
+ PreparedStatement stmt =
+ conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?, ?)");
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, j);
+ stmt.setString(3, "val1_" + i + "_" + j);
+ stmt.setString(4, j % 2 == 0 ? "val2_" + i + "_" + j : null);
+ stmt.setBoolean(5, false);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+
+ // Mark some rows expired
+ PreparedStatement expireStmt = conn
+ .prepareStatement("upsert into " + tableName + " (id1, id2, expired)
VALUES(?, ?, true)");
+ for (int j = 0; j < 5; ++j) {
+ expireStmt.setInt(1, 0);
+ expireStmt.setInt(2, j);
+ expireStmt.executeUpdate();
+ }
+ conn.commit();
+
+ // Update rows expired — conditional TTL triggers extra CP cells on
update path
+ PreparedStatement updateStmt =
+ conn.prepareStatement("upsert into " + tableName + " (id1, id2, val1)
VALUES(?, ?, ?)");
+ for (int j = 0; j < 5; ++j) {
+ updateStmt.setInt(1, 0);
+ updateStmt.setInt(2, j);
+ updateStmt.setString(3, "val11_" + 0 + "_" + j);
+ updateStmt.executeUpdate();
+ }
+ conn.commit();
+
+ // Replay on cluster 2 and verify cross-cluster cell-level equality
+ replayAndVerifyAcrossClusters(Collections.singletonList(createTableDdl),
tableName);
}
}
@@ -397,4 +578,66 @@ public class ReplicationLogGroupIT extends HABaseIT {
assertEquals(1, getCountForTable(systemTables, SYSTEM_CHILD_LINK_NAME));
assertTrue(getCountForTable(systemTables, SYSTEM_CATALOG_NAME) > 0);
}
+
+ private List<Path> findLogFiles(Path dir, FileSystem fs) throws IOException {
+ List<Path> files = new ArrayList<>();
+ findLogFilesRecursive(dir, fs, files);
+ return files;
+ }
+
+ private void findLogFilesRecursive(Path dir, FileSystem fs, List<Path>
files) throws IOException {
+ if (!fs.exists(dir)) {
+ return;
+ }
+ for (FileStatus status : fs.listStatus(dir)) {
+ if (status.isDirectory()) {
+ findLogFilesRecursive(status.getPath(), fs, files);
+ } else if (status.getPath().getName().endsWith(".plog")) {
+ files.add(status.getPath());
+ }
+ }
+ }
+
+ private void assertTablesEqualAcrossClusters(String hbaseTableName) throws
Exception {
+ TableName tn = TableName.valueOf(hbaseTableName);
+ try (
+ org.apache.hadoop.hbase.client.Connection hconn1 =
ConnectionFactory.createConnection(conf1);
+ org.apache.hadoop.hbase.client.Connection hconn2 =
ConnectionFactory.createConnection(conf2);
+ Table table1 = hconn1.getTable(tn); Table table2 = hconn2.getTable(tn)) {
+
+ Scan scan = new Scan();
+ scan.readAllVersions();
+
+ try (ResultScanner scanner1 = table1.getScanner(scan);
+ ResultScanner scanner2 = table2.getScanner(scan)) {
+ int rowCount = 0;
+ while (true) {
+ Result r1 = scanner1.next();
+ Result r2 = scanner2.next();
+ if (r1 == null && r2 == null) {
+ break;
+ }
+ assertNotNull(
+ String.format("Table %s: cluster 2 has fewer rows at row %d",
hbaseTableName, rowCount),
+ r2);
+ assertNotNull(
+ String.format("Table %s: cluster 1 has fewer rows at row %d",
hbaseTableName, rowCount),
+ r1);
+ try {
+ Result.compareResults(r1, r2, true);
+ } catch (Exception e) {
+ LOG.error("Table {} row {} mismatch. Dumping both tables:",
hbaseTableName, rowCount);
+ LOG.error("--- Cluster 1 ---");
+ TestUtil.dumpTable(table1);
+ LOG.error("--- Cluster 2 ---");
+ TestUtil.dumpTable(table2);
+ fail(String.format("Table %s row %d mismatch: %s", hbaseTableName,
rowCount,
+ e.getMessage()));
+ }
+ rowCount++;
+ }
+ LOG.info("Table {} matches across clusters: {} rows verified",
hbaseTableName, rowCount);
+ }
+ }
+ }
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
index 70049b0883..a3955c5783 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
@@ -31,6 +31,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
@@ -232,16 +235,6 @@ public class LogFileCodecTest {
singleRecordTest(LogFileTestUtil.newPutRecord("", 1L, "row", 12345L, 1));
}
- @Test
- public void testCodecWithEmptyFamily() throws IOException {
- long ts = 12345L;
- Put put = new Put(Bytes.toBytes("row"));
- put.setTimestamp(ts);
- put.addColumn(HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("q"), ts,
Bytes.toBytes("v"));
- singleRecordTest(
- new
LogFileRecord().setHBaseTableName("TBLEMPTYFAM").setCommitId(1L).setMutation(put));
- }
-
@Test
public void testCodecWithEmptyQualifier() throws IOException {
long ts = 12345L;
@@ -401,4 +394,64 @@ public class LogFileCodecTest {
}
}
+ @Test
+ public void testDeleteWithMixedCellTypes() throws IOException {
+ long ts = 12345L;
+ byte[] row = Bytes.toBytes("row");
+ byte[] cf = Bytes.toBytes("cf");
+ Delete delete = new Delete(row);
+ delete.setTimestamp(ts);
+
delete.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(cf)
+
.setQualifier(Bytes.toBytes("q1")).setTimestamp(ts).setType(Cell.Type.DeleteColumn).build());
+
delete.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(cf)
+
.setQualifier(HConstants.EMPTY_BYTE_ARRAY).setTimestamp(ts).setType(Cell.Type.DeleteFamily)
+ .build());
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLMIXED").setCommitId(1L).setMutation(delete));
+ }
+
+ @Test
+ public void testCellTypeByteRoundTripForAllTypes() throws IOException {
+ long ts = 12345L;
+ byte[] row = Bytes.toBytes("row");
+ byte[] cf = Bytes.toBytes("cf");
+ byte[] qual = Bytes.toBytes("q");
+
+ // Put cell type
+ Put put = new Put(row);
+ put.setTimestamp(ts);
+ put.addColumn(cf, qual, ts, Bytes.toBytes("v"));
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLCTP").setCommitId(1L).setMutation(put));
+
+ // Delete (single version) cell type
+ Delete del1 = new Delete(row);
+ del1.setTimestamp(ts);
+ del1.addColumn(cf, qual, ts);
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLCTD").setCommitId(1L).setMutation(del1));
+
+ // DeleteColumn (all versions) cell type
+ Delete del2 = new Delete(row);
+ del2.setTimestamp(ts);
+
del2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(cf)
+
.setQualifier(qual).setTimestamp(ts).setType(Cell.Type.DeleteColumn).build());
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLCTDC").setCommitId(1L).setMutation(del2));
+
+ // DeleteFamily cell type
+ Delete del3 = new Delete(row);
+ del3.setTimestamp(ts);
+ del3.addFamily(cf, ts);
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLCTDF").setCommitId(1L).setMutation(del3));
+
+ // DeleteFamilyVersion cell type
+ Delete del4 = new Delete(row);
+ del4.setTimestamp(ts);
+ del4.addFamilyVersion(cf, ts);
+ singleRecordTest(
+ new
LogFileRecord().setHBaseTableName("TBLCTDFV").setCommitId(1L).setMutation(del4));
+ }
+
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
index 0044483008..c198a400ba 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
@@ -458,8 +458,7 @@ public class LogFileFormatTest {
reader.init(readerContext, input);
fail("Expected InvalidLogTrailerException when trailer is missing");
} catch (InvalidLogTrailerException e) {
- assertTrue("Exception message should contain 'Unsupported version'",
- e.getMessage().contains("Unsupported version"));
+ // Expected — the bytes at the trailer position are block data, not a
valid trailer
}
}