This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 32f4b655f0 PHOENIX-7931 Coalesce per-batch mutations into a single
record (#2540)
32f4b655f0 is described below
commit 32f4b655f0adec920db14f23e6b8732d541008d0
Author: tkhurana <[email protected]>
AuthorDate: Wed Jul 1 17:21:51 2026 -0700
PHOENIX-7931 Coalesce per-batch mutations into a single record (#2540)
---
.../phoenix/hbase/index/IndexRegionObserver.java | 109 +++------
.../phoenix/replication/MutationCellGrouper.java | 80 ++++++
.../apache/phoenix/replication/ReplicationLog.java | 9 +-
.../phoenix/replication/ReplicationLogGroup.java | 48 +++-
.../apache/phoenix/replication/log/LogFile.java | 65 ++++-
.../phoenix/replication/log/LogFileCodec.java | 218 ++++++++---------
.../phoenix/replication/log/LogFileRecord.java | 125 +++++-----
.../phoenix/replication/log/LogFileWriter.java | 7 +-
.../reader/ReplicationLogProcessor.java | 31 +--
.../phoenix/replication/tool/LogFileAnalyzer.java | 43 +++-
.../phoenix/replication/ReplicationLogGroupIT.java | 139 ++++++++---
.../reader/ReplicationLogProcessorTestIT.java | 26 +-
.../replication/MutationCellGrouperTest.java | 210 ++++++++++++++++
.../replication/ReplicationLogGroupTest.java | 271 ++++++++++++++++-----
.../phoenix/replication/log/LogFileCodecTest.java | 194 ++++++++++++++-
.../replication/log/LogFileCompressionTest.java | 4 +-
.../phoenix/replication/log/LogFileTestUtil.java | 69 ++++--
.../replication/log/LogFileWriterSyncTest.java | 18 +-
.../phoenix/replication/log/LogFileWriterTest.java | 6 +-
19 files changed, 1229 insertions(+), 443 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index c7825ce96c..533c1b4da3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -130,6 +130,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.MutationCellGrouper;
import org.apache.phoenix.replication.ReplicationLogGroup;
import org.apache.phoenix.replication.SystemCatalogWALEntryFilter;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
@@ -163,7 +164,6 @@ import org.xerial.snappy.Snappy;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -861,7 +861,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
if (!regularCells.isEmpty()) {
String tableName = logKey.getTableName().getNameAsString();
- for (Mutation split : splitCellsIntoMutations(regularCells)) {
+ for (Mutation split :
MutationCellGrouper.splitCellsIntoMutations(regularCells)) {
if (!this.ignoreReplicationFilter.test(split)) {
logGroup.append(tableName, -1, split);
}
@@ -2873,52 +2873,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return status.getOperationStatusCode() == SUCCESS && status.getResult() !=
null;
}
- /**
- * Splits cells into individual Put/Delete mutations grouped by (row key,
put-vs-delete). HBase's
- * checkAndMergeCPMutations merges coprocessor cells into the data mutation,
so a single Put may
- * contain Delete cells with different row keys (e.g., local index). This
method recovers distinct
- * mutations using the same grouping algorithm as HBase's ReplicationSink.
- */
- private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
- return previousCell == null || previousCell.getType() != cell.getType()
- || !CellUtil.matchingRows(previousCell, cell);
- }
-
- static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) throws
IOException {
- List<Mutation> result = new ArrayList<>();
- Cell previousCell = null;
- Mutation current = null;
- for (Cell cell : cells) {
- if (isNewRowOrType(previousCell, cell)) {
- if (current != null) {
- result.add(current);
- }
- if (CellUtil.isDelete(cell)) {
- current = new Delete(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
- } else {
- current = new Put(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
- }
- }
- if (CellUtil.isDelete(cell)) {
- ((Delete) current).add(cell);
- } else {
- ((Put) current).add(cell);
- }
- previousCell = cell;
- }
- if (current != null) {
- result.add(current);
- }
- return result;
- }
-
- static List<Mutation> splitCellsIntoMutations(Mutation merged) throws
IOException {
- if (merged.isEmpty()) {
- return Collections.singletonList(merged);
- }
- return
splitCellsIntoMutations(Iterables.concat(merged.getFamilyCellMap().values()));
- }
-
private void replicateMutations(RegionCoprocessorEnvironment env,
MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext
context)
throws IOException {
@@ -2939,6 +2893,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// Record ReplicationSyncTime only when we are actually doing work (not on
early-return paths).
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
+ List<Cell> dataTableCells = new ArrayList<>();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (this.ignoreReplicationFilter.test(m)) {
@@ -2946,37 +2901,45 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
// When coprocessors add cells (local index, conditional TTL, ON
DUPLICATE KEY UPDATE),
// HBase merges them into the data mutation which can mix row keys and
cell types.
- // Split those back into individual Put/Delete mutations for correct
serialization.
- if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
- group.append(this.dataTableName, -1, m);
- } else {
- for (Mutation split : splitCellsIntoMutations(m)) {
- group.append(this.dataTableName, -1, split);
- }
- }
+ // We stream the cells through as-is — the consumer reconstructs
Put/Delete mutations
+ // on the row+type boundary.
+ appendCells(dataTableCells, m);
}
- if (context.preIndexUpdates != null) {
- for (Map.Entry<HTableInterfaceReference, Mutation> entry :
context.preIndexUpdates
- .entries()) {
- if (this.ignoreReplicationFilter.test(entry.getValue())) {
- continue;
- }
- group.append(entry.getKey().getTableName(), -1, entry.getValue());
- }
- }
- if (context.postIndexUpdates != null) {
- for (Map.Entry<HTableInterfaceReference, Mutation> entry :
context.postIndexUpdates
- .entries()) {
- if (this.ignoreReplicationFilter.test(entry.getValue())) {
- continue;
- }
- group.append(entry.getKey().getTableName(), -1, entry.getValue());
- }
+ if (!dataTableCells.isEmpty()) {
+ group.append(this.dataTableName, -1, dataTableCells);
}
+ appendIndexUpdates(group, context.preIndexUpdates);
+ appendIndexUpdates(group, context.postIndexUpdates);
group.sync();
} finally {
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
metricSource.updateReplicationSyncTime(this.dataTableName, duration);
}
}
+
+ private void appendIndexUpdates(ReplicationLogGroup group,
+ ListMultimap<HTableInterfaceReference, Mutation> updates) throws
IOException {
+ if (updates == null) {
+ return;
+ }
+ for (Map.Entry<HTableInterfaceReference, Collection<Mutation>> entry :
updates.asMap()
+ .entrySet()) {
+ List<Cell> cells = new ArrayList<>();
+ for (Mutation m : entry.getValue()) {
+ if (this.ignoreReplicationFilter.test(m)) {
+ continue;
+ }
+ appendCells(cells, m);
+ }
+ if (!cells.isEmpty()) {
+ group.append(entry.getKey().getTableName(), -1, cells);
+ }
+ }
+ }
+
+ private static void appendCells(List<Cell> bucket, Mutation m) {
+ for (List<Cell> familyCells : m.getFamilyCellMap().values()) {
+ bucket.addAll(familyCells);
+ }
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/MutationCellGrouper.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/MutationCellGrouper.java
new file mode 100644
index 0000000000..d058bd192a
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/MutationCellGrouper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Groups a flat cell stream into Put/Delete mutations, mirroring the
algorithm HBase's
+ * ReplicationSink uses to reconstruct mutations from a WALEdit. A new
mutation is started whenever
+ * the row key or the cell type differs from the immediately preceding cell;
consecutive cells
+ * sharing both are collected into one mutation. Because the full cell type
participates in the
+ * boundary, distinct delete subtypes (e.g. DeleteColumn vs. DeleteFamily)
also split into separate
+ * mutations. There is no precondition on the input ordering: any cell stream
produces valid
+ * mutations. Ordering only affects how the cells are partitioned into
Mutation objects (a row that
+ * recurs non-consecutively yields a separate mutation per run), not
correctness -- cell order is
+ * preserved, so replaying the resulting mutations in order reproduces the
effect of applying the
+ * input cells in order.
+ */
+public final class MutationCellGrouper {
+
+ private MutationCellGrouper() {
+ }
+
+ private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
+ return previousCell == null || previousCell.getType() != cell.getType()
+ || !CellUtil.matchingRows(previousCell, cell);
+ }
+
+ /** Group a cell stream into Put/Delete mutations using the row+type
boundary algorithm. */
+ public static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells)
throws IOException {
+ List<Mutation> result = new ArrayList<>();
+ Cell previousCell = null;
+ Mutation current = null;
+ for (Cell cell : cells) {
+ if (isNewRowOrType(previousCell, cell)) {
+ if (current != null) {
+ result.add(current);
+ }
+ if (CellUtil.isDelete(cell)) {
+ current = new Delete(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
+ } else {
+ current = new Put(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
+ }
+ }
+ if (CellUtil.isDelete(cell)) {
+ ((Delete) current).add(cell);
+ } else {
+ ((Put) current).add(cell);
+ }
+ previousCell = cell;
+ }
+ if (current != null) {
+ result.add(current);
+ }
+ return result;
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 40337c190f..dc681e8d90 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.phoenix.replication.ReplicationLogGroup.Record;
import org.apache.phoenix.replication.log.LogFileWriter;
@@ -312,7 +311,7 @@ public class ReplicationLog {
LOG.info("Replaying {} unsynced records into new writer {}",
currentBatch.size(),
currentWriter);
for (Record r : currentBatch) {
- currentWriter.append(r.tableName, r.commitId, r.mutation);
+ currentWriter.append(r.tableName, r.commitId, r.cells);
}
}
@@ -352,7 +351,7 @@ public class ReplicationLog {
protected void append(Record r) throws IOException {
final boolean[] blockSynced = { false };
apply(writer -> {
- blockSynced[0] = writer.append(r.tableName, r.commitId, r.mutation);
+ blockSynced[0] = writer.append(r.tableName, r.commitId, r.cells);
});
// Add to current batch only after we succeed at appending
currentBatch.add(r);
@@ -367,10 +366,6 @@ public class ReplicationLog {
}
}
- protected void append(String tableName, long commitId, Mutation mutation)
throws IOException {
- apply(writer -> writer.append(tableName, commitId, mutation));
- }
-
protected void sync() throws IOException {
apply(LogFileWriter::sync);
currentBatch.clear();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index 0a862d640a..0e38bd5528 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
@@ -329,15 +330,19 @@ public class ReplicationLogGroup {
}
}
+ /**
+ * Append payload carried through the ring buffer. Always carries a flat
{@link List} of
+ * {@link Cell}s; the per-mutation public API extracts the cells before
publishing.
+ */
protected static class Record {
- public String tableName;
- public long commitId;
- public Mutation mutation;
+ public final String tableName;
+ public final long commitId;
+ public final List<Cell> cells;
- public Record(String tableName, long commitId, Mutation mutation) {
+ public Record(String tableName, long commitId, List<Cell> cells) {
this.tableName = tableName;
this.commitId = commitId;
- this.mutation = mutation;
+ this.cells = cells;
}
}
@@ -551,6 +556,37 @@ public class ReplicationLogGroup {
if (LOG.isTraceEnabled()) {
LOG.trace("Append: table={}, commitId={}, mutation={}", tableName,
commitId, mutation);
}
+ List<Cell> cells = new ArrayList<>();
+ for (List<Cell> familyCells : mutation.getFamilyCellMap().values()) {
+ cells.addAll(familyCells);
+ }
+ if (cells.isEmpty()) {
+ throw new IllegalArgumentException("Cannot append a mutation with no
cells");
+ }
+ publishDataEvent(new Record(tableName, commitId, cells));
+ }
+
+ /**
+ * Append a coalesced batch of cells as a single record on the log. The
cells must already be
+ * grouped by row+type so consumers can reconstruct Put/Delete mutations on
the row+type boundary
+ * (see {@link MutationCellGrouper}). Behaves identically to
+ * {@link #append(String, long, Mutation)} with respect to backpressure and
fail-stop.
+ * @param tableName The HBase table name shared by every cell in {@code
cells}.
+ * @param commitId The commit identifier (e.g., SCN) for the batch.
+ * @param cells The flat ordered cell stream for one batch on one table.
+ * @throws IOException If the writer is closed or the ring buffer is full.
+ */
+ public void append(String tableName, long commitId, List<Cell> cells) throws
IOException {
+ if (cells == null || cells.isEmpty()) {
+ throw new IllegalArgumentException("Cannot append a record with no
cells");
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Append: table={}, commitId={}, cells={}", tableName,
commitId, cells.size());
+ }
+ publishDataEvent(new Record(tableName, commitId, cells));
+ }
+
+ private void publishDataEvent(Record record) throws IOException {
if (isClosed()) {
throw new IOException("Closed");
}
@@ -566,7 +602,7 @@ public class ReplicationLogGroup {
long sequence = ringBuffer.next();
try {
LogEvent event = ringBuffer.get(sequence);
- event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId,
mutation), null);
+ event.setValues(EVENT_TYPE_DATA, record, null);
} finally {
// Update ring buffer events metric
ringBuffer.publish(sequence);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index a93ada0b48..8e3e2ea25f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -22,10 +22,13 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -258,16 +261,57 @@ public interface LogFile {
void write(DataOutput out) throws IOException;
}
- /** Represents a single logical change */
+ /** Represents a single logical change (a batch of cells across one or more
rows) */
interface Record {
/**
- * Gets the mutation this record represents.
- * @return The Mutation.
+ * Gets the cells in this record's body.
+ * @return The cell list.
*/
- Mutation getMutation();
+ List<Cell> getCells();
/**
- * Sets the mutation this record represents.
+ * Sets the cells in this record's body.
+ * @param cells The cell list to set.
+ * @return This Record instance for chaining.
+ */
+ Record setCells(List<Cell> cells);
+
+ /**
+ * Gets the attributes for this record. These apply uniformly to every
mutation reconstructed
+ * from {@link #getCells()}.
+ * @return The attribute map (keys are attribute names, values are
attribute byte arrays).
+ */
+ Map<String, byte[]> getAttributes();
+
+ /**
+ * Sets the attributes for this record.
+ * @param attributes The attribute map to set.
+ * @return This Record instance for chaining.
+ */
+ Record setAttributes(Map<String, byte[]> attributes);
+
+ /**
+ * Reconstructs the mutations in this record by grouping {@link
#getCells()} on the row+type
+ * boundary (one mutation per contiguous run of cells with the same row
and same put-vs-delete
+ * disposition). Attributes from {@link #getAttributes()} are applied to
each result mutation.
+ * @return The list of reconstructed Put/Delete mutations.
+ * @throws IOException if mutation assembly fails.
+ */
+ List<Mutation> getMutations() throws IOException;
+
+ /**
+ * Convenience accessor that returns the single mutation in this record.
Throws if
+ * {@link #getMutations()} produces anything other than exactly one entry.
+ * @return The single reconstructed Put or Delete mutation.
+ * @throws IllegalStateException if the body does not contain exactly one
mutation.
+ * @throws IOException if mutation assembly fails.
+ */
+ Mutation getMutation() throws IOException;
+
+ /**
+ * Convenience setter that populates this record's cell body from a single
mutation. Cells are
+ * taken from the mutation's family cell map. Attributes are cleared;
callers that need
+ * attributes on the record must set them explicitly via {@link
#setAttributes(Map)}.
* @param mutation The Mutation to set.
* @return This Record instance for chaining.
*/
@@ -324,14 +368,15 @@ public interface LogFile {
void init(LogFileWriterContext context) throws IOException;
/**
- * Appends an HBase mutation to the log file. The log record may be
buffered internally.
- * @param tableName The HBase table name
- * @param commitId The commit identifier
- * @param mutation The mutation to append.
+ * Appends a coalesced batch of cells as a single record. The cells must
already be grouped by
+ * row+type so consumers can reconstruct Put/Delete mutations on the
row+type boundary.
+ * @param tableName The HBase table name shared by every cell.
+ * @param commitId The commit identifier.
+ * @param cells The flat ordered cell stream for one batch on one
table.
* @return true if an implicit sync happened (block full), false if
buffered only
* @throws IOException if an I/O error occurs during append.
*/
- boolean append(String tableName, long commitId, Mutation mutation) throws
IOException;
+ boolean append(String tableName, long commitId, List<Cell> cells) throws
IOException;
/**
* Flushes any buffered data to the underlying storage and ensures it is
durable (e.g., by
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
index a0975287d3..0f25aa23fb 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
@@ -26,55 +26,56 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
/**
- * Default Codec for encoding and decoding ReplicationLog Records within a
block buffer. This
- * implementation uses standard Java DataInput/DataOutput for serialization.
Record Format within a
- * block:
+ * Default Codec for encoding and decoding ReplicationLog Records within a
block buffer. The on-disk
+ * record is cell-oriented (mirroring HBase's WALEdit): a record carries a
flat ordered list of
+ * cells across one or more rows. Mutation reconstruction (grouping by
row+type) is the consumer's
+ * responsibility and lives in {@link
org.apache.phoenix.replication.MutationCellGrouper}.
*
* <pre>
* +--------------------------------------------+
* | RECORD LENGTH (vint) |
* +--------------------------------------------+
* | RECORD HEADER |
- * | - Mutation type (byte) |
* | - HBase table name length (vint) |
* | - HBase table name (byte[]) |
- * | - Transaction/SCN or commit ID (vint) |
+ * | - Commit ID (vlong) |
* +--------------------------------------------+
- * | ROW KEY LENGTH (vint) |
- * | ROW KEY (byte[]) |
+ * | NUMBER OF ATTRIBUTES (vint) |
* +--------------------------------------------+
- * | MUTATION TIMESTAMP (vint) |
+ * | PER-ATTRIBUTE (repeated) |
+ * | +--------------------------------------+ |
+ * | | ATTRIBUTE KEY LENGTH (vint) | |
+ * | | ATTRIBUTE KEY (byte[]) | |
+ * | | ATTRIBUTE VALUE LENGTH (vint) | |
+ * | | ATTRIBUTE VALUE (byte[]) | |
+ * | +--------------------------------------+ |
* +--------------------------------------------+
- * | NUMBER OF COLUMN FAMILIES CHANGED (vint) |
+ * | NUMBER OF CELLS (vint) |
* +--------------------------------------------+
- * | PER-FAMILY DATA (repeated) |
- * | +--------------------------------------+ |
- * | | COLUMN FAMILY NAME LENGTH (vint) | |
- * | | COLUMN FAMILY NAME (byte[]) | |
- * | | NUMBER OF CELLS IN FAMILY (vint) | |
+ * | PER-CELL DATA (repeated) |
* | +--------------------------------------+ |
- * | | PER-CELL DATA (repeated) | |
- * | | +––––––––––––----------------–--–+ | |
- * | | | CELL TIMESTAMP (long) | | |
- * | | | CELL TYPE (byte) | | |
- * | | | COLUMN QUALIFIER LENGTH (vint) | | |
- * | | | COLUMN QUALIFIER (byte[]) | | |
- * | | | VALUE LENGTH (vint) | | |
- * | | | VALUE (byte[]) | | |
- * | | +–––––––––––––--------------––--–+ | |
+ * | | ROW LENGTH (vint) | |
+ * | | ROW (byte[]) | |
+ * | | FAMILY LENGTH (vint) | |
+ * | | FAMILY (byte[]) | |
+ * | | QUALIFIER LENGTH (vint) | |
+ * | | QUALIFIER (byte[]) | |
+ * | | TIMESTAMP (long) | |
+ * | | TYPE (byte) | |
+ * | | VALUE LENGTH (vint) | |
+ * | | VALUE (byte[]) | |
* | +--------------------------------------+ |
* +--------------------------------------------+
* </pre>
@@ -114,64 +115,65 @@ public class LogFileCodec implements LogFile.Codec {
@Override
public void write(LogFile.Record record) throws IOException {
-
+ if (record.getCells().isEmpty()) {
+ throw new IllegalArgumentException("Cannot encode a record with no
cells");
+ }
DataOutput recordOut = new DataOutputStream(currentRecord);
- // Write record fields
-
- Mutation mutation = record.getMutation();
- LogFileRecord.MutationType mutationType =
LogFileRecord.MutationType.get(mutation);
- recordOut.writeByte(mutationType.getCode());
+ // Header: table name + commit id
byte[] nameBytes =
record.getHBaseTableName().getBytes(StandardCharsets.UTF_8);
WritableUtils.writeVInt(recordOut, nameBytes.length);
recordOut.write(nameBytes);
WritableUtils.writeVLong(recordOut, record.getCommitId());
- byte[] rowKey = mutation.getRow();
- WritableUtils.writeVInt(recordOut, rowKey.length);
- recordOut.write(rowKey);
- recordOut.writeLong(mutation.getTimestamp());
- Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
- int cfCount = familyMap.size();
- WritableUtils.writeVInt(recordOut, cfCount);
+ // Attributes
+ Map<String, byte[]> attrs = record.getAttributes();
+ WritableUtils.writeVInt(recordOut, attrs.size());
+ for (Map.Entry<String, byte[]> e : attrs.entrySet()) {
+ byte[] keyBytes = e.getKey().getBytes(StandardCharsets.UTF_8);
+ WritableUtils.writeVInt(recordOut, keyBytes.length);
+ recordOut.write(keyBytes);
+ byte[] val = e.getValue();
+ WritableUtils.writeVInt(recordOut, val.length);
+ if (val.length > 0) {
+ recordOut.write(val);
+ }
+ }
- for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
- byte[] cf = entry.getKey();
- WritableUtils.writeVInt(recordOut, cf.length);
- recordOut.write(cf);
- List<Cell> cells = entry.getValue();
- WritableUtils.writeVInt(recordOut, cells.size());
- for (Cell cell : cells) {
- recordOut.writeLong(cell.getTimestamp());
- recordOut.writeByte(cell.getTypeByte());
- WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
+ // Cells
+ List<Cell> cells = record.getCells();
+ WritableUtils.writeVInt(recordOut, cells.size());
+ for (Cell cell : cells) {
+ WritableUtils.writeVInt(recordOut, cell.getRowLength());
+ recordOut.write(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
+ WritableUtils.writeVInt(recordOut, cell.getFamilyLength());
+ recordOut.write(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength());
+ WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
+ if (cell.getQualifierLength() > 0) {
recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
- WritableUtils.writeVInt(recordOut, cell.getValueLength());
- if (cell.getValueLength() > 0) {
- recordOut.write(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength());
- }
+ }
+ recordOut.writeLong(cell.getTimestamp());
+ recordOut.writeByte(cell.getTypeByte());
+ WritableUtils.writeVInt(recordOut, cell.getValueLength());
+ if (cell.getValueLength() > 0) {
+ recordOut.write(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength());
}
}
byte[] currentRecordBytes = currentRecord.toByteArray();
- // Write total record length
WritableUtils.writeVInt(out, currentRecordBytes.length);
- // Write the record
out.write(currentRecordBytes);
- // Set the size (including the vint prefix) on the record object
((LogFileRecord) record).setSerializedLength(
currentRecordBytes.length +
WritableUtils.getVIntSize(currentRecordBytes.length));
- // Reset the ByteArrayOutputStream to release resources
currentRecord.reset();
}
}
private static class RecordDecoder implements LogFile.Codec.Decoder {
private final DataInput in;
- // A reference to the object populated by the last successful advance()
private LogFileRecord current = null;
RecordDecoder(DataInput in) {
@@ -185,78 +187,64 @@ public class LogFileCodec implements LogFile.Codec {
recordDataLength += WritableUtils.getVIntSize(recordDataLength);
current = new LogFileRecord();
- // Set the total serialized length on the record
current.setSerializedLength(recordDataLength);
- LogFileRecord.MutationType type =
LogFileRecord.MutationType.codeToType(in.readByte());
-
+ // Header
int nameBytesLen = WritableUtils.readVInt(in);
byte[] nameBytes = new byte[nameBytesLen];
in.readFully(nameBytes);
-
current.setHBaseTableName(Bytes.toString(nameBytes));
-
current.setCommitId(WritableUtils.readVLong(in));
- int rowKeyLen = WritableUtils.readVInt(in);
- byte[] rowKey = new byte[rowKeyLen];
- in.readFully(rowKey);
-
- Mutation mutation;
- switch (type) {
- case PUT:
- mutation = new Put(rowKey);
- break;
- case DELETE:
- mutation = new Delete(rowKey);
- break;
- default:
- throw new UnsupportedOperationException("Unhandled mutation type "
+ type);
+ // Attributes
+ int attrCount = WritableUtils.readVInt(in);
+ Map<String, byte[]> attrs = attrCount == 0 ? new HashMap<>() : new
HashMap<>(attrCount);
+ for (int i = 0; i < attrCount; i++) {
+ int keyLen = WritableUtils.readVInt(in);
+ byte[] keyBytes = new byte[keyLen];
+ in.readFully(keyBytes);
+ int valLen = WritableUtils.readVInt(in);
+ byte[] valBytes = new byte[valLen];
+ if (valLen > 0) {
+ in.readFully(valBytes);
+ }
+ attrs.put(new String(keyBytes, StandardCharsets.UTF_8), valBytes);
}
- current.setMutation(mutation);
+ current.setAttributes(attrs);
- long ts = in.readLong();
- mutation.setTimestamp(ts);
-
- int cfCount = WritableUtils.readVInt(in);
- for (int i = 0; i < cfCount; i++) {
- // Col name
- int cfLen = WritableUtils.readVInt(in);
- byte[] cf = new byte[cfLen];
- in.readFully(cf);
- // Qualifiers+Values Count
- int columnValuePairsCount = WritableUtils.readVInt(in);
- for (int j = 0; j < columnValuePairsCount; j++) {
- // Cell timestamp
- long cellTs = in.readLong();
- // Cell type byte
- byte cellTypeByte = in.readByte();
- // Qualifier name
- int qualLen = WritableUtils.readVInt(in);
- byte[] qual = new byte[qualLen];
- if (qualLen > 0) {
- in.readFully(qual);
- }
- // Value
- int valueLen = WritableUtils.readVInt(in);
- byte[] value = new byte[valueLen];
- if (valueLen > 0) {
- in.readFully(value);
- }
- Cell cell = new KeyValue(rowKey, 0, rowKey.length, cf, 0,
cf.length, qual, 0,
- qual.length, cellTs, KeyValue.Type.codeToType(cellTypeByte),
value, 0, value.length);
- if (mutation instanceof Put) {
- ((Put) mutation).add(cell);
- } else {
- ((Delete) mutation).add(cell);
- }
+ // Cells
+ int cellCount = WritableUtils.readVInt(in);
+ List<Cell> cells = new ArrayList<>(cellCount);
+ for (int i = 0; i < cellCount; i++) {
+ int rowLen = WritableUtils.readVInt(in);
+ byte[] row = new byte[rowLen];
+ if (rowLen > 0) {
+ in.readFully(row);
+ }
+ int famLen = WritableUtils.readVInt(in);
+ byte[] family = new byte[famLen];
+ if (famLen > 0) {
+ in.readFully(family);
+ }
+ int qualLen = WritableUtils.readVInt(in);
+ byte[] qual = new byte[qualLen];
+ if (qualLen > 0) {
+ in.readFully(qual);
+ }
+ long ts = in.readLong();
+ byte typeByte = in.readByte();
+ int valueLen = WritableUtils.readVInt(in);
+ byte[] value = new byte[valueLen];
+ if (valueLen > 0) {
+ in.readFully(value);
}
+ cells.add(new KeyValue(row, 0, row.length, family, 0, family.length,
qual, 0, qual.length,
+ ts, KeyValue.Type.codeToType(typeByte), value, 0, value.length));
}
+ current.setCells(cells);
- // Successfully read a record
return true;
} catch (EOFException e) {
- // End of stream
current = null;
return false;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
index 6b12a3e659..e9c5b807b0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
@@ -17,9 +17,16 @@
*/
package org.apache.phoenix.replication.log;
-import org.apache.hadoop.hbase.client.Delete;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.phoenix.replication.MutationCellGrouper;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = { "EI_EXPOSE_REP",
"EI_EXPOSE_REP2" },
justification = "Intentional")
@@ -27,7 +34,8 @@ public class LogFileRecord implements LogFile.Record {
private String tableName;
private long commitId;
- private Mutation mutation;
+ private List<Cell> cells = Collections.emptyList();
+ private Map<String, byte[]> attributes = Collections.emptyMap();
private int serializedLength;
public LogFileRecord() {
@@ -56,93 +64,80 @@ public class LogFileRecord implements LogFile.Record {
}
@Override
- public Mutation getMutation() {
- return this.mutation;
+ public List<Cell> getCells() {
+ return cells;
}
@Override
- public LogFile.Record setMutation(Mutation mutation) {
- this.mutation = mutation;
+ public LogFile.Record setCells(List<Cell> cells) {
+ Preconditions.checkNotNull(cells, "cells must not be null");
+ this.cells = cells;
return this;
}
@Override
- public int getSerializedLength() {
- // NOTE: Should be set by the Codec using setSerializedLength after
reading or writing
- // the record.
- return this.serializedLength;
+ public Map<String, byte[]> getAttributes() {
+ return attributes;
}
@Override
- public LogFile.Record setSerializedLength(int serializedLength) {
- this.serializedLength = serializedLength;
+ public LogFile.Record setAttributes(Map<String, byte[]> attributes) {
+ Preconditions.checkNotNull(attributes, "attributes must not be null");
+ this.attributes = attributes;
return this;
}
@Override
- public int hashCode() {
- int code = tableName.hashCode();
- code ^= Long.hashCode(commitId);
- code ^= mutation.toString().hashCode();
- return code;
+ public List<Mutation> getMutations() throws IOException {
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ if (!attributes.isEmpty()) {
+ for (Mutation m : result) {
+ for (Map.Entry<String, byte[]> e : attributes.entrySet()) {
+ m.setAttribute(e.getKey(), e.getValue());
+ }
+ }
+ }
+ return result;
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
+ public Mutation getMutation() throws IOException {
+ List<Mutation> mutations = getMutations();
+ if (mutations.size() != 1) {
+ throw new IllegalStateException("Record does not contain exactly one
mutation (count="
+ + mutations.size() + "); use getMutations() instead");
}
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- LogFileRecord other = (LogFileRecord) obj;
- return tableName.equals(other.tableName) && commitId == other.commitId
- && mutation.toString().equals(other.mutation.toString());
+ return mutations.get(0);
}
@Override
- public String toString() {
- return "LogFileRecord [mutation=" + mutation.toString() + ", tableName=" +
tableName
- + ", commitId=" + commitId + " ]";
- }
-
- // Internals only below. Not for LogFile interface consumer use.
-
- protected enum MutationType {
- PUT(1),
- DELETE(2);
-
- private int code;
-
- MutationType(int code) {
- this.code = code;
- }
-
- int getCode() {
- return code;
+ public LogFile.Record setMutation(Mutation mutation) {
+ List<Cell> body = new ArrayList<>();
+ for (List<Cell> familyCells : mutation.getFamilyCellMap().values()) {
+ body.addAll(familyCells);
}
+ this.cells = body;
+ this.attributes = Collections.emptyMap();
+ return this;
+ }
- static MutationType get(Mutation mutation) {
- if (mutation instanceof Put) {
- return PUT;
- } else if (mutation instanceof Delete) {
- return DELETE;
- }
- throw new UnsupportedOperationException("Unsupported mutation type: " +
mutation);
- }
+ @Override
+ public int getSerializedLength() {
+ // NOTE: Should be set by the Codec using setSerializedLength after
reading or writing
+ // the record.
+ return this.serializedLength;
+ }
- static MutationType codeToType(int code) {
- for (MutationType type : MutationType.values()) {
- if (type.code == code) {
- return type;
- }
- }
- throw new UnsupportedOperationException("Unsupported mutation code: " +
code);
- }
+ @Override
+ public LogFile.Record setSerializedLength(int serializedLength) {
+ this.serializedLength = serializedLength;
+ return this;
+ }
+ @Override
+ public String toString() {
+ return "LogFileRecord [tableName=" + tableName + ", commitId=" + commitId
+ ", cellCount="
+ + cells.size() + ", attrCount=" + attributes.size() + "]";
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index dc0cb811f1..df3d428864 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -19,10 +19,11 @@ package org.apache.phoenix.replication.log;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.Cell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,12 +80,12 @@ public class LogFileWriter implements LogFile.Writer {
}
@Override
- public boolean append(String tableName, long commitId, Mutation mutation)
throws IOException {
+ public boolean append(String tableName, long commitId, List<Cell> cells)
throws IOException {
if (isClosed()) {
throw new IOException("Writer has been closed");
}
return writer.append(
- new
LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setMutation(mutation));
+ new
LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setCells(cells));
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 432cf9189d..d9e488394c 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -246,21 +246,22 @@ public class ReplicationLogProcessor implements Closeable
{
for (LogFile.Record record : logFileReader) {
final TableName tableName =
TableName.valueOf(record.getHBaseTableName());
- final Mutation mutation = record.getMutation();
-
- tableToMutationsMap.computeIfAbsent(tableName, k -> new
ArrayList<>()).add(mutation);
-
- // Increment current batch size and current batch size bytes
- currentBatchSize++;
- currentBatchSizeBytes += mutation.heapSize();
-
- // Process when we reach either the batch count or size limit
- if (currentBatchSize >= getBatchSize() || currentBatchSizeBytes >=
getBatchSizeBytes()) {
- processReplicationLogBatch(tableToMutationsMap);
- totalProcessed += currentBatchSize;
- tableToMutationsMap.clear();
- currentBatchSize = 0;
- currentBatchSizeBytes = 0;
+ // A record may reconstruct into multiple mutations. Batches split on
the mutation
+ // boundary, so a single record's mutations can span two
processReplicationLogBatch
+ // invocations -- do not assume per-record atomicity here.
+ for (Mutation mutation : record.getMutations()) {
+ tableToMutationsMap.computeIfAbsent(tableName, k -> new
ArrayList<>()).add(mutation);
+ currentBatchSize++;
+ currentBatchSizeBytes += mutation.heapSize();
+
+ // Process when we reach either the batch count or size limit
+ if (currentBatchSize >= getBatchSize() || currentBatchSizeBytes >=
getBatchSizeBytes()) {
+ processReplicationLogBatch(tableToMutationsMap);
+ totalProcessed += currentBatchSize;
+ tableToMutationsMap.clear();
+ currentBatchSize = 0;
+ currentBatchSizeBytes = 0;
+ }
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
index 18172721a0..b61adc9fce 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
@@ -126,6 +126,43 @@ public class LogFileAnalyzer extends Configured implements
Tool {
return allFiles;
}
+ /**
+ * Count the number of log <em>records</em> (not flattened mutations) per
table across all log
+ * files under {@code source}. Unlike {@link #groupLogsByTable(String)},
which expands each record
+ * into its constituent mutations, this preserves the record boundary so
callers can assert the
+ * coalescing contract (one record per table per batch).
+ */
+ public Map<String, Integer> countRecordsByTable(String source) throws
IOException {
+ Map<String, Integer> allFiles = Maps.newHashMap();
+ init();
+ Path path = new Path(source);
+ List<Path> filesToAnalyze = getFilesToAnalyze(path);
+ for (Path file : filesToAnalyze) {
+ Map<String, Integer> perFile = countRecordsByTable(file);
+ for (Map.Entry<String, Integer> entry : perFile.entrySet()) {
+ allFiles.merge(entry.getKey(), entry.getValue(), Integer::sum);
+ }
+ }
+ return allFiles;
+ }
+
+ private Map<String, Integer> countRecordsByTable(Path file) throws
IOException {
+ Map<String, Integer> recordsByTable = Maps.newHashMap();
+ LogFileReaderContext context = new
LogFileReaderContext(getConf()).setFileSystem(fs)
+ .setFilePath(file).setSkipCorruptBlocks(check);
+ LogFileReader reader = new LogFileReader();
+ try {
+ reader.init(context);
+ Record record;
+ while ((record = reader.next()) != null) {
+ recordsByTable.merge(record.getHBaseTableName(), 1, Integer::sum);
+ }
+ } finally {
+ reader.close();
+ }
+ return recordsByTable;
+ }
+
private Map<String, List<Mutation>> groupLogsByTable(Path file) throws
IOException {
Map<String, List<Mutation>> mutationsByTable = Maps.newHashMap();
System.out.println("\nAnalyzing file: " + file);
@@ -139,7 +176,7 @@ public class LogFileAnalyzer extends Configured implements
Tool {
while ((record = reader.next()) != null) {
String tableName = record.getHBaseTableName();
List<Mutation> mutations = mutationsByTable.getOrDefault(tableName,
Lists.newArrayList());
- mutations.add(record.getMutation());
+ mutations.addAll(record.getMutations());
mutationsByTable.put(tableName, mutations);
}
} finally {
@@ -198,7 +235,9 @@ public class LogFileAnalyzer extends Configured implements
Tool {
System.out.println("\nRecord #" + recordCount + ":");
System.out.println(" Table: " + record.getHBaseTableName());
System.out.println(" Commit ID: " + record.getCommitId());
- System.out.println(" Mutation: " + record.getMutation());
+ for (Mutation m : record.getMutations()) {
+ System.out.println(" Mutation: " + m);
+ }
if (verbose) {
System.out.println(" Serialized Length: " +
record.getSerializedLength());
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index cb4ca7bcee..c4dd5149b2 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -67,7 +67,6 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.PhoenixTestBuilder;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.replication.metrics.ReplicationLogMetricValues;
import org.apache.phoenix.replication.reader.ReplicationLogProcessor;
import org.apache.phoenix.replication.tool.LogFileAnalyzer;
import org.apache.phoenix.util.TestUtil;
@@ -151,6 +150,14 @@ public class ReplicationLogGroupIT extends HABaseIT {
return mutations != null ? mutations.size() : 0;
}
+ private Map<String, Integer> countRecordsByTable() throws Exception {
+ LogFileAnalyzer analyzer = new LogFileAnalyzer();
+ // use peer cluster conf
+ analyzer.setConf(conf2);
+ Path standByLogDir =
logGroup.getOrCreatePeerShardManager().getRootDirectoryPath();
+ return analyzer.countRecordsByTable(standByLogDir.toString());
+ }
+
private void verifyReplication(Map<String, Integer> expected) throws
Exception {
// first close the logGroup
logGroup.close();
@@ -179,24 +186,6 @@ public class ReplicationLogGroupIT extends HABaseIT {
}
}
- private void assertMetricsEmitted() {
- ReplicationLogMetricValues values =
logGroup.getMetrics().getCurrentMetricValues();
- assertTrue("appendTime should be > 0, got " + values.getAppendTimeMax(),
- values.getAppendTimeMax() > 0);
- assertTrue("syncTime should be > 0, got " + values.getSyncTimeMax(),
- values.getSyncTimeMax() > 0);
- assertTrue("ringBufferTime should be > 0, got " +
values.getRingBufferTimeMax(),
- values.getRingBufferTimeMax() > 0);
- assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTimeMax(),
- values.getFsSyncTimeMax() > 0);
- assertTrue("batchSize should be > 0, got " + values.getBatchSizeMax(),
- values.getBatchSizeMax() > 0);
- assertTrue("pendingSyncCount should be > 0, got " +
values.getPendingSyncCountMax(),
- values.getPendingSyncCountMax() > 0);
- assertTrue("pendingSyncWaitTime should be > 0, got " +
values.getPendingSyncWaitTimeMax(),
- values.getPendingSyncWaitTimeMax() > 0);
- }
-
private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable)
{
LOG.info("Dump table log count for test {}", name.getMethodName());
for (Map.Entry<String, List<Mutation>> table :
mutationsByTable.entrySet()) {
@@ -340,7 +329,6 @@ public class ReplicationLogGroupIT extends HABaseIT {
PreparedStatement stmt =
conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?)");
// upsert 50 rows
- int rowCount = 50;
for (int i = 0; i < 5; ++i) {
for (int j = 0; j < 10; ++j) {
stmt.setInt(1, i);
@@ -351,6 +339,45 @@ public class ReplicationLogGroupIT extends HABaseIT {
}
conn.commit();
}
+
+ // Update existing rows changing only the covered column val2 (val1
unchanged). With cell
+ // coalescing each phase's index cells share one record, so this
exercises:
+ // index1 (on val1, includes val2): index row key UNCHANGED -> PRE
unverified Put and POST
+ // verified Put target the SAME index row, i.e. two writes to the same
empty-column
+ // qualifier (UNVERIFIED then VERIFIED) split across the PRE and POST
records.
+ // index2 (on val2): index row key CHANGES (null -> value) ->
Delete(oldKey)+Put(newKey).
+ PreparedStatement updateVal2 =
+ conn.prepareStatement("upsert into " + tableName + " (id1, id2, val2)
VALUES(?, ?, ?)");
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ updateVal2.setInt(1, i);
+ updateVal2.setInt(2, j);
+ updateVal2.setString(3, "val2_" + i + "_" + j);
+ updateVal2.executeUpdate();
+ }
+ }
+ conn.commit();
+
+ // Update existing rows changing the indexed column val1 (val2
unchanged). This flips the
+ // roles relative to the previous pass:
+ // index1 (on val1): index row key CHANGES -> the PRE record makes the
old index row
+ // unverified (Put) and the new index row unverified (Put), while the
POST record holds a
+ // verified Put on the new key and a Delete on the old key -- a Put and
a Delete on
+ // DIFFERENT rows within one coalesced record, which the grouper must
split on the
+ // row+type boundary.
+ // index2 (on val2): index row key UNCHANGED -> PRE unverified + POST
verified on same row.
+ PreparedStatement updateVal1 =
+ conn.prepareStatement("upsert into " + tableName + " (id1, id2, val1)
VALUES(?, ?, ?)");
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ updateVal1.setInt(1, i);
+ updateVal1.setInt(2, j);
+ updateVal1.setString(3, "newval1_" + i + "_" + j);
+ updateVal1.executeUpdate();
+ }
+ }
+ conn.commit();
+
// do some atomic upserts which will be ignored and therefore not
replicated
stmt = conn.prepareStatement(
"upsert into " + tableName + " VALUES(?, ?, ?) " + "ON DUPLICATE KEY
IGNORE");
@@ -364,18 +391,15 @@ public class ReplicationLogGroupIT extends HABaseIT {
}
}
- // Sanity-check that producer- and consumer-side metrics fired at least
once on the haGroup.
- // This guards against the rotationTimeMs-style bug where a metric is
declared but never
- // emitted. Snapshot before verifyReplication() since it closes the log
group.
- assertMetricsEmitted();
-
- // verify replication mutation counts
- // mutation count will be equal to row count since the atomic upsert
mutations will be
- // ignored and therefore not replicated
+ // Verify the system tables are never replicated, and flush the log
group (verifyReplication
+ // closes it) before replay. We deliberately do NOT assert exact
data/index mutation totals
+ // here: the multi-pass update workload's per-table counts are dominated
by index-maintenance
+ // internals (local-index key churn, verified/unverified empty-column
writes) rather than by
+ // the coalescing under test, and coalescing is mutation-count invariant
by construction. The
+ // authoritative correctness check for this workload is the
cross-cluster cell-level equality
+ // below; the record-count contract of coalescing is pinned separately in
+ // testAppendAndSyncSingleBatchRecordCount.
Map<String, Integer> expected = Maps.newHashMap();
- expected.put(tableName, rowCount * 3); // Put + Delete + local index
update
- expected.put(indexName1, rowCount * 3); // unverified + verified +
delete (DeleteColumn)
- expected.put(indexName2, rowCount * 2); // unverified + verified
expected.put(SYSTEM_CATALOG_NAME, 0);
expected.put(SYSTEM_CHILD_LINK_NAME, 0);
verifyReplication(expected);
@@ -387,6 +411,59 @@ public class ReplicationLogGroupIT extends HABaseIT {
}
}
+ /**
+ * Pins the per-batch coalescing contract: one server-side batch on a table
with one index emits
+ * exactly three log records -- one for the data table, one for the index
PRE phase, and one for
+ * the index POST phase -- regardless of how many rows the batch contains.
Before coalescing this
+ * batch would have produced one record per mutation (3 rows x ~3 mutations
each); coalescing
+ * collapses each (table, phase) into a single cell-stream record.
Cross-cluster cell equality
+ * confirms the collapsed records still reconstruct the correct mutations on
the standby.
+ */
+ @Test
+ public void testAppendAndSyncSingleBatchRecordCount() throws Exception {
+ final String tableName = "T_" + generateUniqueName();
+ final String indexName = "I_" + generateUniqueName();
+ String createTableDdl = String.format(
+ "create table if not exists %s (id integer not null primary key, val1
varchar, val2 varchar)",
+ tableName);
+ String createIndexDdl = String
+ .format("create index if not exists %s on %s (val1) include (val2)",
indexName, tableName);
+
+ try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection)
DriverManager
+ .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+ conn.createStatement().execute(createTableDdl);
+ conn.createStatement().execute(createIndexDdl);
+ conn.commit();
+
+ // Insert several rows and commit them as a SINGLE batch (autocommit
off, one commit()). All
+ // rows in this batch coalesce into one data-table record plus one PRE
and one POST record on
+ // the index table.
+ PreparedStatement stmt =
+ conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?)");
+ int rowCount = 5;
+ for (int i = 0; i < rowCount; ++i) {
+ stmt.setInt(1, i);
+ stmt.setString(2, "v1_" + i);
+ stmt.setString(3, "v2_" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+
+ // Flush the log group so the standby files are complete, then count
records per table.
+ logGroup.close();
+ Map<String, Integer> recordsByTable = countRecordsByTable();
+ LOG.info("Records by table: {}", recordsByTable);
+ assertEquals("Data table should have exactly one coalesced record for
the batch",
+ Integer.valueOf(1), recordsByTable.get(tableName));
+ assertEquals("Index table should have exactly two records (PRE + POST)
for the batch",
+ Integer.valueOf(2), recordsByTable.get(indexName));
+
+ // Replay on cluster 2 and verify cross-cluster cell-level equality.
+ replayAndVerifyAcrossClusters(Arrays.asList(createTableDdl,
createIndexDdl), tableName,
+ indexName);
+ }
+ }
+
@Test
public void testAppendAndSyncNoIndex() throws Exception {
final String tableName = "T_" + generateUniqueName();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index e486f82935..f3edb027f8 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -132,7 +132,7 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
// Add a mutation to make it a proper log file with data
Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
- writer.append(tableName, 1, put);
+ writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
writer.sync();
writer.close();
@@ -213,7 +213,7 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
LogFileWriter writer = initLogFileWriter(filePath);
Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
- writer.append(tableName, 1, put);
+ writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
writer.sync();
// Do NOT call writer.close() -- skips trailer, simulates a writer crash
after sync
@@ -261,7 +261,7 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
LogFileWriter writer = initLogFileWriter(filePath);
Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
- writer.append(tableName, 1, put);
+ writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
writer.sync();
// Do NOT call writer.close() -- skips trailer
@@ -311,7 +311,7 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
// Add a mutation to make it a proper log file with data
Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
- writer.append(tableName, 1, put);
+ writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
writer.sync();
writer.close();
@@ -528,14 +528,14 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
generateHBaseMutations(phoenixConnection, 5, table2Name, 101L, "b");
table1Mutations.forEach(mutation -> {
try {
- writer.append(table1Name, mutation.hashCode(), mutation);
+ writer.append(table1Name, mutation.hashCode(),
LogFileTestUtil.cellsOf(mutation));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
table2Mutations.forEach(mutation -> {
try {
- writer.append(table2Name, mutation.hashCode(), mutation);
+ writer.append(table2Name, mutation.hashCode(),
LogFileTestUtil.cellsOf(mutation));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -646,7 +646,7 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
// Add one mutation
Mutation put = LogFileTestUtil.newPut("row1", 3L, 4);
- writer.append(tableNameString, 1, put);
+ writer.append(tableNameString, 1, LogFileTestUtil.cellsOf(put));
writer.sync();
// For processing of an unclosed file to work, we need to disable trailer
validation
@@ -733,7 +733,7 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"),
Bytes.toBytes("abcd"));
mutations.add(put);
- writer.append(tableName, i, put);
+ writer.append(tableName, i, LogFileTestUtil.cellsOf(put));
}
// Add 1 big mutation that will cross the byte size threshold before count
threshold
@@ -749,14 +749,14 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
+ "it crosses the byte size threshold and forces a batch to be
processed."));
mutations.add(bigPut);
- writer.append(tableName, 100, bigPut);
+ writer.append(tableName, 100, LogFileTestUtil.cellsOf(bigPut));
// Add more small mutations that will be batched due to count limit
for (int i = 3; i < 10; i++) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"),
Bytes.toBytes("abcd"));
mutations.add(put);
- writer.append(tableName, i, put);
+ writer.append(tableName, i, LogFileTestUtil.cellsOf(put));
}
writer.close();
@@ -924,13 +924,13 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
// Add mutation for table1
Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2)
+ 1);
table1Mutations.add(put1);
- writer.append(table1Name, (i * 2) + 1, put1);
+ writer.append(table1Name, (i * 2) + 1, LogFileTestUtil.cellsOf(put1));
writer.sync();
// Add mutation for table2
Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2)
+ 2);
table2Mutations.add(put2);
- writer.append(table2Name, (i * 2) + 2, put2);
+ writer.append(table2Name, (i * 2) + 2, LogFileTestUtil.cellsOf(put2));
writer.sync();
}
writer.close();
@@ -1663,7 +1663,7 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
for (int i = 0; i < totalRecords; i++) {
Mutation put = LogFileTestUtil.newPut("row" + i, i + 1, i + 1);
originalMutations.add(put);
- writer.append(tableName, i + 1, put);
+ writer.append(tableName, i + 1, LogFileTestUtil.cellsOf(put));
writer.sync();
}
writer.close();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/MutationCellGrouperTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/MutationCellGrouperTest.java
new file mode 100644
index 0000000000..684fdebcd7
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/MutationCellGrouperTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link MutationCellGrouper#splitCellsIntoMutations}, the
replay-side inverse of
+ * per-batch cell coalescing. This algorithm is the correctness lynchpin of
PHOENIX-7931: a
+ * regression here would silently merge or split mutations on the standby with
no exception, so the
+ * row+type boundary behavior is pinned here at the unit level rather than
only through the
+ * heavyweight cross-cluster IT.
+ */
+public class MutationCellGrouperTest {
+
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+ private static final long TS = 100L;
+
+ private static Cell putCell(String row, String value) {
+ return
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes(row))
+
.setFamily(FAMILY).setQualifier(QUALIFIER).setTimestamp(TS).setType(Cell.Type.Put)
+ .setValue(Bytes.toBytes(value)).build();
+ }
+
+ private static Cell deleteColumnCell(String row) {
+ return
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes(row))
+
.setFamily(FAMILY).setQualifier(QUALIFIER).setTimestamp(TS).setType(Cell.Type.DeleteColumn)
+ .build();
+ }
+
+ private static Cell deleteFamilyCell(String row) {
+ return
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes(row))
+
.setFamily(FAMILY).setQualifier(QUALIFIER).setTimestamp(TS).setType(Cell.Type.DeleteFamily)
+ .build();
+ }
+
+ private static String rowOf(Mutation m) {
+ return Bytes.toString(m.getRow());
+ }
+
+ @Test
+ public void testEmptyInputYieldsEmptyList() throws Exception {
+ List<Mutation> result =
+ MutationCellGrouper.splitCellsIntoMutations(Collections.<Cell>
emptyList());
+ assertTrue("Empty input should produce no mutations", result.isEmpty());
+ }
+
+ @Test
+ public void testSinglePutCell() throws Exception {
+ Cell cell = putCell("row1", "v1");
+ List<Mutation> result =
+
MutationCellGrouper.splitCellsIntoMutations(Collections.singletonList(cell));
+ assertEquals(1, result.size());
+ assertTrue("Expected a Put", result.get(0) instanceof Put);
+ assertEquals("row1", rowOf(result.get(0)));
+ assertEquals(1, result.get(0).size());
+ }
+
+ @Test
+ public void testSingleDeleteCell() throws Exception {
+ Cell cell = deleteColumnCell("row1");
+ List<Mutation> result =
+
MutationCellGrouper.splitCellsIntoMutations(Collections.singletonList(cell));
+ assertEquals(1, result.size());
+ assertTrue("Expected a Delete", result.get(0) instanceof Delete);
+ assertEquals("row1", rowOf(result.get(0)));
+ }
+
+ @Test
+ public void testContiguousCellsSameRowAndTypeFormOneMutation() throws
Exception {
+ List<Cell> cells = new ArrayList<>();
+ cells.add(putCell("row1", "v1"));
+ cells.add(putCell("row1", "v2"));
+ cells.add(putCell("row1", "v3"));
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ assertEquals("Same row + same type cells must coalesce into one Put", 1,
result.size());
+ assertTrue(result.get(0) instanceof Put);
+ assertEquals(3, result.get(0).size());
+ }
+
+ @Test
+ public void testRowChangeStartsNewMutation() throws Exception {
+ List<Cell> cells = new ArrayList<>();
+ cells.add(putCell("row1", "v1"));
+ cells.add(putCell("row2", "v2"));
+ cells.add(putCell("row3", "v3"));
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ assertEquals("Each distinct row should yield its own Put", 3,
result.size());
+ assertEquals("row1", rowOf(result.get(0)));
+ assertEquals("row2", rowOf(result.get(1)));
+ assertEquals("row3", rowOf(result.get(2)));
+ }
+
+ /**
+ * The case that justifies the class: a single row whose Put cells precede
its Delete cells (e.g.
+ * an index row that is rewritten then partially deleted within one
server-side batch) must split
+ * on the put-vs-delete boundary into a separate Put and Delete, never merge
into one mutation.
+ */
+ @Test
+ public void testPutThenDeleteSameRowSplitsOnTypeBoundary() throws Exception {
+ List<Cell> cells = new ArrayList<>();
+ cells.add(putCell("row1", "v1"));
+ cells.add(deleteColumnCell("row1"));
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ assertEquals("Put and Delete on the same row must be two mutations", 2,
result.size());
+ assertTrue("First should be the Put", result.get(0) instanceof Put);
+ assertTrue("Second should be the Delete", result.get(1) instanceof Delete);
+ assertEquals("row1", rowOf(result.get(0)));
+ assertEquals("row1", rowOf(result.get(1)));
+ }
+
+ @Test
+ public void testPutDeletePutOnThreeRowsYieldsThreeMutations() throws
Exception {
+ List<Cell> cells = new ArrayList<>();
+ cells.add(putCell("rowA", "v1"));
+ cells.add(deleteColumnCell("rowB"));
+ cells.add(putCell("rowC", "v3"));
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ assertEquals(3, result.size());
+ assertTrue(result.get(0) instanceof Put);
+ assertTrue(result.get(1) instanceof Delete);
+ assertTrue(result.get(2) instanceof Put);
+ assertEquals("rowA", rowOf(result.get(0)));
+ assertEquals("rowB", rowOf(result.get(1)));
+ assertEquals("rowC", rowOf(result.get(2)));
+ }
+
+ /**
+ * Documents that the boundary is keyed on the exact cell type, not merely
put-vs-delete: adjacent
+ * DeleteColumn and DeleteFamily cells on the same row split into two
separate Delete mutations.
+ * This mirrors HBase's ReplicationSink and is relied on so a future change
does not silently
+ * coalesce distinct delete subtypes.
+ */
+ @Test
+ public void testAdjacentDeleteSubtypesSameRowSplit() throws Exception {
+ List<Cell> cells = new ArrayList<>();
+ cells.add(deleteColumnCell("row1"));
+ cells.add(deleteFamilyCell("row1"));
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ assertEquals("Distinct delete subtypes must split into separate Deletes",
2, result.size());
+ assertTrue(result.get(0) instanceof Delete);
+ assertTrue(result.get(1) instanceof Delete);
+ }
+
+ /**
+ * The grouper keys boundaries off the immediately preceding cell only, so a
row that recurs
+ * non-contiguously produces a separate mutation per contiguous run. This
encodes the documented
+ * "global row ordering is not required" contract: rowA appearing twice with
rowB between yields
+ * two rowA mutations, not a merged one.
+ */
+ @Test
+ public void testNonContiguousSameRowYieldsSeparateMutations() throws
Exception {
+ List<Cell> cells = new ArrayList<>();
+ cells.add(putCell("rowA", "v1"));
+ cells.add(putCell("rowB", "v2"));
+ cells.add(putCell("rowA", "v3"));
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ assertEquals(3, result.size());
+ assertEquals("rowA", rowOf(result.get(0)));
+ assertEquals("rowB", rowOf(result.get(1)));
+ assertEquals("rowA", rowOf(result.get(2)));
+ }
+
+ @Test
+ public void testCellsArePreservedInResultMutations() throws Exception {
+ Cell put1 = putCell("row1", "v1");
+ Cell put2 = putCell("row1", "v2");
+ List<Cell> cells = new ArrayList<>();
+ cells.add(put1);
+ cells.add(put2);
+ List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+ assertEquals(1, result.size());
+ List<Cell> grouped = result.get(0).getFamilyCellMap().get(FAMILY);
+ assertEquals(2, grouped.size());
+ assertTrue(
+ CellUtil.equals(put1, grouped.get(0)) && CellUtil.matchingValue(put1,
grouped.get(0)));
+ assertTrue(
+ CellUtil.equals(put2, grouped.get(1)) && CellUtil.matchingValue(put2,
grouped.get(1)));
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index dcd4cd55d3..fa69ac78f6 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
@@ -109,11 +110,16 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Happens-before ordering verification, using Mockito's inOrder. Verify
that the appends
// happen before sync, and sync happened after appends.
- inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId1),
eq(put1));
- inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId2),
eq(put2));
- inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId3),
eq(put3));
- inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId4),
eq(put4));
- inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId5),
eq(put5));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId1),
+ eq(LogFileTestUtil.cellsOf(put1)));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId2),
+ eq(LogFileTestUtil.cellsOf(put2)));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId3),
+ eq(LogFileTestUtil.cellsOf(put3)));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId4),
+ eq(LogFileTestUtil.cellsOf(put4)));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId5),
+ eq(LogFileTestUtil.cellsOf(put5)));
inOrder.verify(writer, times(1)).sync();
}
@@ -144,7 +150,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Verify the sequence: append, sync (fail), sync (succeed on retry with
same writer)
InOrder inOrder = Mockito.inOrder(writer);
- inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(writer, times(2)).sync(); // First fails, second succeeds
}
@@ -169,7 +176,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
sleep(50); // Simulate slow processing
return invocation.callRealMethod();
}
- }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+ }).when(innerWriter).append(anyString(), anyLong(), any(List.class));
// Fill up the ring buffer by sending enough events.
for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
@@ -206,7 +213,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Append should have completed", appendFuture.isDone());
// Verify the append eventually happens on the writer.
- verify(innerWriter, timeout(10000).times(1)).append(eq(tableName),
eq(myCommitId), any());
+ verify(innerWriter, timeout(10000).times(1)).append(eq(tableName),
eq(myCommitId),
+ any(List.class));
}
/**
@@ -225,7 +233,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Configure writerBeforeRoll to fail on the first append call
doThrow(new IOException("Simulated append
failure")).when(writerBeforeRoll).append(anyString(),
- anyLong(), any(Mutation.class));
+ anyLong(), any(List.class));
// Append data
logGroup.append(tableName, commitId, put);
@@ -237,9 +245,11 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Verify the sequence: append (fail), rotate, append (succeed), sync
InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
- inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put));
+ inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName),
eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append,
did not try
- inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // Retry
+ inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName),
eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put))); // Retry
inOrder.verify(writerAfterRoll, times(1)).sync();
}
@@ -348,7 +358,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Verify that all of appends were processed by the internal writer.
for (int i = 0; i < APPENDS_PER_THREAD * 2; i++) {
final long commitId = i;
- verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), any());
+ verify(innerWriter, times(1)).append(eq(tableName), eq(commitId),
any(List.class));
}
}
@@ -389,9 +399,11 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Verify the sequence of operations
InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
- inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId), eq(put));
+ inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(writerBeforeRotation, times(1)).sync();
- inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 1), eq(put));
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 1),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(writerAfterRotation, times(1)).sync();
}
@@ -430,7 +442,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
// Verify the final append went to the new writer
- verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(writerAfterRotation, times(1)).sync();
}
@@ -510,10 +523,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
// Verify first batch went to initial writer
- verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L),
eq(put));
+ verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(writerBeforeRotation, times(1)).sync();
// Verify second batch went to new writer (swap happened before append)
- verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId +
1), eq(put));
+ verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId +
1),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(writerAfterRotation, times(1)).sync();
// Verify the initial writer was closed asynchronously
verify(writerBeforeRotation, timeout(5000).times(1)).close();
@@ -568,11 +583,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Verify operations went to the writers in the correct order
InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
- inOrder.verify(initialWriter).append(eq(tableName), eq(commitId), eq(put));
+ inOrder.verify(initialWriter).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(initialWriter).sync();
- inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1),
eq(put));
+ inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(initialWriter).sync();
- inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2),
eq(put));
+ inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(writerAfterRotate).sync();
}
@@ -628,7 +646,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Configure writer to throw a RuntimeException on append
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
- anyLong(), any(Mutation.class));
+ anyLong(), any(List.class));
// Append publishes to the ring buffer. The event handler catches the
RuntimeException via
// catch(Throwable), poisons itself, and fails the sync future. The
producer receives
@@ -766,12 +784,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// 5 appends went to old writer (processed before rotation task fired)
for (int i = 0; i < 5; i++) {
inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId + i),
- eq(put));
+ eq(LogFileTestUtil.cellsOf(put)));
}
// Swap happens before sync action: 5 records replayed into new writer
for (int i = 0; i < 5; i++) {
inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + i),
- eq(put));
+ eq(LogFileTestUtil.cellsOf(put)));
}
// Sync goes to new writer
inOrder.verify(writerAfterRotation, times(1)).sync();
@@ -849,10 +867,6 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Write records across multiple rotations.
for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
- // Get the path of the current log file.
- Path logPath = activeLog.getWriter().getContext().getFilePath();
- logPaths.add(logPath);
-
for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
LogFile.Record record =
@@ -861,6 +875,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
}
logGroup.sync(); // Sync to commit the appends to the current writer.
+ // Capture the writer path AFTER sync, once this batch's apply() has
settled currentWriter to
+ // the file these records landed on. Capturing before the appends races
the async swap event:
+ // checkAndReplaceWriter's pendingWriter.getAndSet(null) and the
currentWriter assignment are
+ // not atomic, so getWriter() can see the pending writer already taken
by the consumer while
+ // currentWriter still points at the previous file, mis-recording the
path.
+ logPaths.add(activeLog.getWriter().getContext().getFilePath());
// Force a rotation to close the current writer.
activeLog.forceRotation();
}
@@ -919,9 +939,6 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
ReplicationLog activeLog = logGroup.getActiveLog();
for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
- Path logPath = activeLog.getWriter().getContext().getFilePath();
- logPaths.add(logPath);
-
for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
LogFile.Record record =
@@ -931,6 +948,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
}
logGroup.sync();
+ // Capture the writer path AFTER sync, once this batch's apply() has
settled currentWriter to
+ // the file these records landed on. Capturing before the appends races
the async swap event:
+ // checkAndReplaceWriter's pendingWriter.getAndSet(null) and the
currentWriter assignment are
+ // not atomic, so getWriter() can see the pending writer already taken
by the consumer while
+ // currentWriter still points at the previous file, mis-recording the
path.
+ logPaths.add(activeLog.getWriter().getContext().getFilePath());
activeLog.forceRotation();
}
@@ -1020,7 +1043,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Configure writer to throw RuntimeException on append
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
- anyLong(), any(Mutation.class));
+ anyLong(), any(List.class));
// Append publishes to the ring buffer. The event handler catches the
RuntimeException,
// poisons itself, and fails the sync future. The producer calls abort().
@@ -1059,7 +1082,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Configure writer to throw RuntimeException on append
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
- anyLong(), any(Mutation.class));
+ anyLong(), any(List.class));
// Append publishes to the ring buffer. The event handler catches the
RuntimeException,
// poisons itself, and fails the sync future. The producer calls abort().
@@ -1166,7 +1189,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
sleep(50); // Delay to allow multiple events to be posted
return invocation.callRealMethod();
}
- }).when(innerWriter).append(eq(tableName), eq(commitId1), eq(put1));
+ }).when(innerWriter).append(eq(tableName), eq(commitId1),
eq(LogFileTestUtil.cellsOf(put1)));
// Post appends and three syncs in quick succession. The first append will
be delayed long
// enough for the three syncs to appear in a single Disruptor batch. Then
they should all
@@ -1181,9 +1204,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Verify the sequence of operations on the inner writer: the three
appends, then exactly
// one sync.
InOrder inOrder = Mockito.inOrder(innerWriter);
- inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId1),
eq(put1));
- inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId2),
eq(put2));
- inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId3),
eq(put3));
+ inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId1),
+ eq(LogFileTestUtil.cellsOf(put1)));
+ inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId2),
+ eq(LogFileTestUtil.cellsOf(put2)));
+ inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId3),
+ eq(LogFileTestUtil.cellsOf(put3)));
inOrder.verify(innerWriter, times(1)).sync(); // Only one sync should be
called
}
@@ -1287,12 +1313,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// invocation/stub state is not thread-safe and the partially-applied stub
can be matched
// against an unrelated method on the consumer thread.
doThrow(new IOException("Simulate append
failure")).when(writer).append(tableName, commitId5,
- put5);
+ LogFileTestUtil.cellsOf(put5));
// Rotated writers must also fail on the 5th append so the retry doesn't
rescue the loop.
doAnswer(invocation -> {
LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
doThrow(new IOException("Simulate append
failure")).when(w).append(tableName, commitId5,
- put5);
+ LogFileTestUtil.cellsOf(put5));
return w;
}).when(activeLog).createNewWriter();
@@ -1309,11 +1335,16 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// verify that all the in-flight appends and syncs are replayed on the new
store and forward
// writer
- inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId1), eq(put1));
- inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId2), eq(put2));
- inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId3), eq(put3));
- inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId4), eq(put4));
- inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId5), eq(put5));
+ inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId1),
+ eq(LogFileTestUtil.cellsOf(put1)));
+ inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId2),
+ eq(LogFileTestUtil.cellsOf(put2)));
+ inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId3),
+ eq(LogFileTestUtil.cellsOf(put3)));
+ inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId4),
+ eq(LogFileTestUtil.cellsOf(put4)));
+ inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName),
eq(commitId5),
+ eq(LogFileTestUtil.cellsOf(put5)));
inOrder.verify(storeAndForwardWriter, times(1)).sync();
}
@@ -1379,15 +1410,16 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// 3 appends went to old writer
for (int i = 0; i < 3; i++) {
inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId + i),
- eq(put));
+ eq(LogFileTestUtil.cellsOf(put)));
}
// 3 records replayed into new writer
for (int i = 0; i < 3; i++) {
inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + i),
- eq(put));
+ eq(LogFileTestUtil.cellsOf(put)));
}
// 4th append goes to new writer
- inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 3), eq(put));
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 3),
+ eq(LogFileTestUtil.cellsOf(put)));
inOrder.verify(writerAfterRotation, times(1)).sync();
// Old writer closed async
@@ -1429,10 +1461,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Should be using new writer", newWriter != initialWriter);
// Old writer: received the append only
- verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
// New writer: received replayed append + successful sync
- verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(newWriter, times(1)).sync();
}
@@ -1441,6 +1475,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
* idle. A rotation tick stages a pending writer. The reader performs HDFS
lease recovery,
* breaking the old writer's stream. When events resume, apply() drains the
healthy staged writer
* before the action — the broken writer is never touched. No replay needed
(empty batch).
+ * <p>
+ * Determinism: the broken-stream stubs are installed while the system is
idle, before
+ * forceRotation() publishes the swap event, so the consumer thread never
races stub installation
+ * on the initialWriter spy. Before verifying the initialWriter invocation
counts we await its
+ * async close (submitClose on the swap), which happens-after every
invocation the swap can make
+ * on that spy — so the count assertions cannot race the
consumer/close-executor threads.
*/
@Test
public void testIdleLeaseRecoveryDrainsStagedWriter() throws Exception {
@@ -1456,15 +1496,16 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId, put);
logGroup.sync();
- // Stage W2 in pendingWriter via forced rotation
- activeLog.forceRotation();
-
- // Simulate HDFS lease recovery breaking the old writer's stream
+ // Simulate HDFS lease recovery breaking the old writer's stream.
Installed while idle — before
+ // forceRotation() publishes the swap event — so the consumer cannot race
this stub install.
doThrow(new IOException("Simulated broken stream after lease
recovery")).when(initialWriter)
- .append(anyString(), anyLong(), any(Mutation.class));
+ .append(anyString(), anyLong(), any(List.class));
doThrow(new IOException("Simulated broken stream after lease
recovery")).when(initialWriter)
.sync();
+ // Stage W2 in pendingWriter via forced rotation
+ activeLog.forceRotation();
+
// Events resume — apply() drains W2 before the action, so broken writer
is never touched
logGroup.append(tableName, commitId + 1, put);
logGroup.sync();
@@ -1474,11 +1515,17 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
newWriter != initialWriter);
// New writer received the new append + sync (no replay — currentBatch was
empty)
- verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1),
eq(put));
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(newWriter, times(1)).sync();
+ // Await the async close of the old writer. This happens-after every
invocation the swap makes
+ // on the initialWriter spy, so the count verifications below cannot race
the consumer thread.
+ verify(initialWriter, timeout(5000).times(1)).close();
+
// Old writer: only the pre-idle append + sync, nothing after the break
- verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(initialWriter, times(1)).sync();
}
@@ -1505,7 +1552,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
throw new IOException("Simulated transient HDFS error during
replay");
}
return appendInvocation.callRealMethod();
- }).when(w).append(anyString(), anyLong(), any(Mutation.class));
+ }).when(w).append(anyString(), anyLong(), any(List.class));
return w;
}).when(activeLog).createNewWriter();
@@ -1526,9 +1573,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Should be using a fresh writer", finalWriter != initialWriter);
// Final writer (W3): replayed r1+r2 then appended r3 — each exactly once.
- verify(finalWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
- verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 1),
eq(put));
- verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 2),
eq(put));
+ verify(finalWriter, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
+ verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 1),
+ eq(LogFileTestUtil.cellsOf(put)));
+ verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 2),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(finalWriter, times(1)).sync();
}
@@ -1549,7 +1599,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Configure initial writer's append to always fail (simulating broken
HDFS stream)
doThrow(new IOException("Simulated broken
stream")).when(initialWriter).append(anyString(),
- anyLong(), any(Mutation.class));
+ anyLong(), any(List.class));
// Append — attempt 1 fails on initialWriter, rotation requested, attempt
2 drains the
// rotated writer and succeeds
@@ -1560,9 +1610,11 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Should be using a new writer after error recovery", newWriter
!= initialWriter);
// Old writer received 1 failed attempt
- verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
// New writer received the successful append
- verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId),
+ eq(LogFileTestUtil.cellsOf(put)));
verify(newWriter, times(1)).sync();
}
@@ -2061,7 +2113,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
doAnswer(invocation -> {
holdConsumer.await();
return invocation.callRealMethod();
- }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+ }).when(innerWriter).append(anyString(), anyLong(), any(List.class));
Thread filler = new Thread(() -> {
try {
@@ -2109,6 +2161,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
final int appendsPerSync = Integer.getInteger("test.appendsPerSync", 5);
final int cellsPerMutation = Integer.getInteger("test.cellsPerMutation",
1);
final long innerSyncDelayMs = Long.getLong("test.innerSyncDelayMs", 2);
+ // Framing of the appendsPerSync mutations between two syncs:
+ // permutation - one append() per mutation (pre-coalescing behavior)
+ // perbatch - one append(table, commitId, List<Cell>) per sync carrying
the whole batch's
+ // flat cell stream (the coalesced behavior IndexRegionObserver now emits)
+ final String recordFraming = System.getProperty("test.recordFraming",
"permutation");
+ final boolean perBatch = "perbatch".equals(recordFraming);
// Use the production-default ring buffer size so producers are not
artificially blocked on
// ringBuffer.next() — the default test fixture uses a 32-slot buffer
which fills under
@@ -2164,11 +2222,26 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
try {
startLatch.await();
for (int i = 0; i < syncsPerProducer; i++) {
- for (int j = 0; j < appendsPerSync; j++) {
+ if (perBatch) {
+ // Coalesce the whole batch into a single record carrying the
flat cell stream,
+ // mirroring IndexRegionObserver's per-(table,batch) append.
+ List<Cell> batchCells = new ArrayList<>(appendsPerSync *
cellsPerMutation);
long commitId = commitIdSeq.getAndIncrement();
- Mutation put = LogFileTestUtil.newPut("row" + commitId,
commitId, cellsPerMutation);
- logGroup.append(tableName, commitId, put);
+ for (int j = 0; j < appendsPerSync; j++) {
+ Mutation put =
+ LogFileTestUtil.newPut("row" + commitId + "_" + j,
commitId, cellsPerMutation);
+ batchCells.addAll(LogFileTestUtil.cellsOf(put));
+ }
+ logGroup.append(tableName, commitId, batchCells);
totalProducerAppends.incrementAndGet();
+ } else {
+ for (int j = 0; j < appendsPerSync; j++) {
+ long commitId = commitIdSeq.getAndIncrement();
+ Mutation put =
+ LogFileTestUtil.newPut("row" + commitId, commitId,
cellsPerMutation);
+ logGroup.append(tableName, commitId, put);
+ totalProducerAppends.incrementAndGet();
+ }
}
logGroup.sync();
totalProducerSyncs.incrementAndGet();
@@ -2236,4 +2309,70 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
pool.awaitTermination(5, TimeUnit.SECONDS);
}
}
+
+ /**
+ * Verifies that the producer- and consumer-side sync metrics are actually
emitted on the write
+ * path. Guards against the "metric declared but never wired" regression
class.
+ * <p>
+ * Two deliberate choices make this assertion deterministic where the IT
version flaked:
+ * <ul>
+ * <li>The inner writer's {@code sync()} is stubbed to sleep a couple of ms
so that
+ * {@code syncTime} and {@code fsSyncTime} — which are stored ms-truncated —
clear the
+ * sub-millisecond floor every run. On a fast disk a real fsync floors to 0
ms, which is correct
+ * behavior but breaks a naive {@code > 0} assertion.</li>
+ * <li>Metrics are read only after a graceful {@link
ReplicationLogGroup#close()}, which joins the
+ * Disruptor consumer thread. This establishes happens-before on every
consumer-recorded metric,
+ * including {@code batchSize}, which is updated on the endOfBatch callback
after the producer's
+ * sync future has already completed.</li>
+ * </ul>
+ */
+ @Test
+ public void testSyncMetricsEmitted() throws Exception {
+ // Make the inner fsync take >= 1ms so the ms-truncated
syncTime/fsSyncTime histograms record a
+ // non-zero value deterministically.
+ final long innerSyncDelayMs = 2;
+ LogFileWriter writer = logGroup.getActiveLog().getWriter();
+ assertNotNull("Writer should not be null", writer);
+ doAnswer(invocation -> {
+ sleep(innerSyncDelayMs);
+ return invocation.callRealMethod();
+ }).when(writer).sync();
+
+ final String tableName = "TESTTBL";
+ final int batches = 5;
+ final int appendsPerBatch = 4;
+ long commitId = 0;
+ for (int b = 0; b < batches; b++) {
+ for (int j = 0; j < appendsPerBatch; j++) {
+ logGroup.append(tableName, commitId, LogFileTestUtil.newPut("row" +
commitId, commitId, 1));
+ commitId++;
+ }
+ logGroup.sync();
+ }
+
+ // Close gracefully (fatalException == null) so the consumer drains and
the executor is joined
+ // before we read. close() is idempotent, so tearDown's close() is a no-op.
+ logGroup.close();
+
+ // getCurrentMetricValues() snapshots and resets the histogram bins, so
call it exactly once.
+ ReplicationLogMetricValues values =
logGroup.getMetrics().getCurrentMetricValues();
+
+ // Nanosecond-resolution metrics never truncate to zero, so assert
strictly positive.
+ assertTrue("appendTime should be > 0, got " + values.getAppendTimeMax(),
+ values.getAppendTimeMax() > 0);
+ assertTrue("ringBufferTime should be > 0, got " +
values.getRingBufferTimeMax(),
+ values.getRingBufferTimeMax() > 0);
+ assertTrue("pendingSyncWaitTime should be > 0, got " +
values.getPendingSyncWaitTimeMax(),
+ values.getPendingSyncWaitTimeMax() > 0);
+ // Counts.
+ assertTrue("batchSize should be > 0, got " + values.getBatchSizeMax(),
+ values.getBatchSizeMax() > 0);
+ assertTrue("pendingSyncCount should be > 0, got " +
values.getPendingSyncCountMax(),
+ values.getPendingSyncCountMax() > 0);
+ // Millisecond-resolution timers: the injected fsync delay clears the
truncation floor.
+ assertTrue("syncTime should be > 0, got " + values.getSyncTimeMax(),
+ values.getSyncTimeMax() > 0);
+ assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTimeMax(),
+ values.getFsSyncTimeMax() > 0);
+ }
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
index a3955c5783..debe855a4d 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
@@ -34,10 +34,12 @@ import java.util.Random;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,8 +109,7 @@ public class LogFileCodecTest {
List<LogFile.Record> originals =
Arrays.asList(LogFileTestUtil.newPutRecord("TBL1", 100L, "row1", 12345L,
1),
LogFileTestUtil.newPutRecord("TBL2", 101L, "row2", 12346L, 2),
- LogFileTestUtil.newPutRecord("TBL1", 102L, "row3", 12347L, 0) // No
columns
- );
+ LogFileTestUtil.newPutRecord("TBL1", 102L, "row3", 12347L, 3));
// Encode
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -261,21 +262,23 @@ public class LogFileCodecTest {
singleRecordTest(LogFileTestUtil.newPutRecord("TBLMANYVALS", 1L, "row",
12345L, 100));
}
- @Test
- public void testCodecWithEmptyPut() throws IOException {
+ @Test(expected = IllegalArgumentException.class)
+ public void testCodecRejectsEmptyPut() throws IOException {
long ts = 12345L;
Put put = new Put(Bytes.toBytes("row"));
put.setTimestamp(ts);
- singleRecordTest(
- new
LogFileRecord().setHBaseTableName("TBLEMPTYPUT").setCommitId(1L).setMutation(put));
+ LogFileCodec codec = new LogFileCodec();
+ codec.getEncoder(new DataOutputStream(new ByteArrayOutputStream()))
+ .write(new
LogFileRecord().setHBaseTableName("TBLEMPTYPUT").setCommitId(1L).setMutation(put));
}
- @Test
- public void testCodecWithEmptyDelete() throws IOException {
+ @Test(expected = IllegalArgumentException.class)
+ public void testCodecRejectsEmptyDelete() throws IOException {
long ts = 12345L;
Delete delete = new Delete(Bytes.toBytes("row"));
delete.setTimestamp(ts);
- singleRecordTest(
+ LogFileCodec codec = new LogFileCodec();
+ codec.getEncoder(new DataOutputStream(new ByteArrayOutputStream())).write(
new
LogFileRecord().setHBaseTableName("TBLEMPTYDEL").setCommitId(1L).setMutation(delete));
}
@@ -454,4 +457,177 @@ public class LogFileCodecTest {
new
LogFileRecord().setHBaseTableName("TBLCTDFV").setCommitId(1L).setMutation(del4));
}
+ @Test
+ public void testMultipleFamiliesRoundTrip() throws IOException {
+ long ts = 12345L;
+ byte[] row = Bytes.toBytes("row");
+ // Add families in non-sorted insertion order to verify the codec
preserves the iteration
+ // order of mutation.getFamilyCellMap() (which is a TreeMap, so families
come out sorted).
+ Put put = new Put(row);
+ put.setTimestamp(ts);
+ put.addColumn(Bytes.toBytes("z_cf"), Bytes.toBytes("q1"), ts,
Bytes.toBytes("vz"));
+ put.addColumn(Bytes.toBytes("a_cf"), Bytes.toBytes("q1"), ts,
Bytes.toBytes("va"));
+ put.addColumn(Bytes.toBytes("m_cf"), Bytes.toBytes("q1"), ts,
Bytes.toBytes("vm"));
+ put.addColumn(Bytes.toBytes("a_cf"), Bytes.toBytes("q2"), ts,
Bytes.toBytes("va2"));
+
+ LogFile.Record original =
+ new
LogFileRecord().setHBaseTableName("TBLMULTIFAM").setCommitId(1L).setMutation(put);
+ singleRecordTest(original);
+
+ // Verify the cells are ordered family-then-qualifier (TreeMap ordering of
getFamilyCellMap)
+ List<Cell> cells = original.getCells();
+ assertEquals(4, cells.size());
+ assertTrue("first family should be a_cf",
+ Bytes.equals(CellUtil.cloneFamily(cells.get(0)), Bytes.toBytes("a_cf")));
+ assertTrue("second family should be a_cf",
+ Bytes.equals(CellUtil.cloneFamily(cells.get(1)), Bytes.toBytes("a_cf")));
+ assertTrue("third family should be m_cf",
+ Bytes.equals(CellUtil.cloneFamily(cells.get(2)), Bytes.toBytes("m_cf")));
+ assertTrue("fourth family should be z_cf",
+ Bytes.equals(CellUtil.cloneFamily(cells.get(3)), Bytes.toBytes("z_cf")));
+ }
+
+ /**
+ * Framing A/B microbenchmark. Isolates the codec-level cost of record
framing. The per-cell wire
+ * format is identical in both modes (same flat-cell encoding); the only
thing that differs is how
+ * often the per-record header (record length + table name + commitId +
attribute count) is paid:
+ * <ul>
+ * <li>Mode A (per-mutation): one {@link LogFileRecord} per mutation,
encoded separately, so the
+ * header is paid {@code mutationCount} times.</li>
+ * <li>Mode B (per-batch): one record carrying the whole batch's flat cell
stream, encoded once,
+ * so the header is paid once.</li>
+ * </ul>
+ * Both modes build their cells identically and share one codec instance, so
any delta in
+ * {@code wireBytes} is attributable to framing overhead and any delta in
{@code appendNs} to the
+ * extra per-record encode calls. Opt in with {@code
-Dtest.runFramingBenchmark=true}; tune with
+ * {@code -Dtest.mutationCount}, {@code -Dtest.cellsPerMutation}, {@code
-Dtest.valueSize},
+ * {@code -Dtest.benchmarkIterations}.
+ */
+ @Test
+ public void testFramingMicrobenchmark() throws IOException {
+ Assume.assumeTrue("Framing microbenchmark, opt in with
-Dtest.runFramingBenchmark=true",
+ Boolean.getBoolean("test.runFramingBenchmark"));
+ final String tableName = "TBLFRAME";
+ final int mutationCount = Integer.getInteger("test.mutationCount", 100);
+ final int cellsPerMutation = Integer.getInteger("test.cellsPerMutation",
4);
+ final int valueSize = Integer.getInteger("test.valueSize", 64);
+ final int iterations = Integer.getInteger("test.benchmarkIterations",
2000);
+ final int warmup = Math.max(1, iterations / 10);
+
+ // Build the batch's cells once. Each mutation contributes
cellsPerMutation cells, each carrying
+ // a valueSize-byte value so the cell payload is production-realistic
relative to the header.
+ // The flat concatenation is what mode B encodes, while mode A re-frames
each mutation's slice.
+ List<List<Cell>> perMutationCells = new ArrayList<>(mutationCount);
+ List<Cell> batchCells = new ArrayList<>(mutationCount * cellsPerMutation);
+ byte[] value = new byte[valueSize];
+ for (int m = 0; m < mutationCount; m++) {
+ Put put = new Put(Bytes.toBytes("row" + m));
+ put.setTimestamp(m);
+ for (int c = 0; c < cellsPerMutation; c++) {
+ put.addColumn(Bytes.toBytes("col" + c), Bytes.toBytes("q"), m, value);
+ }
+ List<Cell> cells = new ArrayList<>(put.getFamilyCellMap().size());
+ for (List<Cell> familyCells : put.getFamilyCellMap().values()) {
+ cells.addAll(familyCells);
+ }
+ perMutationCells.add(cells);
+ batchCells.addAll(cells);
+ }
+
+ LogFileCodec codec = new LogFileCodec();
+
+ // Mode A: one record per mutation, framed separately.
+ BenchResult a = runFramingMode(iterations, warmup, () -> {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ LogFile.Codec.Encoder encoder = codec.getEncoder(dos);
+ int recordCount = 0;
+ for (int m = 0; m < mutationCount; m++) {
+ LogFile.Record record = new
LogFileRecord().setHBaseTableName(tableName).setCommitId(m)
+ .setCells(perMutationCells.get(m));
+ encoder.write(record);
+ recordCount++;
+ }
+ dos.flush();
+ return new int[] { bos.size(), recordCount };
+ });
+
+ // Mode B: one record carrying the whole batch, framed once.
+ BenchResult b = runFramingMode(iterations, warmup, () -> {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ LogFile.Codec.Encoder encoder = codec.getEncoder(dos);
+ LogFile.Record record =
+ new
LogFileRecord().setHBaseTableName(tableName).setCommitId(0L).setCells(batchCells);
+ encoder.write(record);
+ dos.flush();
+ return new int[] { bos.size(), 1 };
+ });
+
+ long totalCells = (long) mutationCount * cellsPerMutation;
+ LOG.info(
+ "Framing benchmark params: mutationCount={} cellsPerMutation={}
valueSize={} totalCells={} "
+ + "iterations={}",
+ mutationCount, cellsPerMutation, valueSize, totalCells, iterations);
+ logFramingMode("A(per-mutation)", a, totalCells);
+ logFramingMode("B(per-batch)", b, totalCells);
+ LOG.info(
+ "Framing A/B ratios: appendNs A/B={} wireBytes A/B={} wireBytesSaved={} "
+ + "(headerBytesPerRecord~={})",
+ String.format("%.2f", (double) a.appendNs / Math.max(1, b.appendNs)),
+ String.format("%.3f", (double) a.wireBytes / Math.max(1, b.wireBytes)),
+ a.wireBytes - b.wireBytes,
+ mutationCount > 1 ? (a.wireBytes - b.wireBytes) / (mutationCount - 1) :
0);
+ }
+
+ /** Aggregated result of one framing mode: best-of encode time and the wire
size produced. */
+ private static final class BenchResult {
+ final long appendNs;
+ final int wireBytes;
+ final int recordCount;
+
+ BenchResult(long appendNs, int wireBytes, int recordCount) {
+ this.appendNs = appendNs;
+ this.wireBytes = wireBytes;
+ this.recordCount = recordCount;
+ }
+ }
+
+ /** Body of one framing mode; returns {@code [wireBytes, recordCount]}. */
+ private interface FramingBody {
+ int[] run() throws IOException;
+ }
+
+ /**
+ * Runs {@code body} for {@code warmup} untimed iterations, then {@code
iterations} timed ones,
+ * returning the minimum single-iteration encode time (least contaminated by
GC/JIT) and the wire
+ * size + record count from the final iteration.
+ */
+ private static BenchResult runFramingMode(int iterations, int warmup,
FramingBody body)
+ throws IOException {
+ for (int i = 0; i < warmup; i++) {
+ body.run();
+ }
+ long minNs = Long.MAX_VALUE;
+ int[] last = null;
+ for (int i = 0; i < iterations; i++) {
+ long startNs = System.nanoTime();
+ last = body.run();
+ long elapsed = System.nanoTime() - startNs;
+ if (elapsed < minNs) {
+ minNs = elapsed;
+ }
+ }
+ return new BenchResult(minNs, last[0], last[1]);
+ }
+
+ private static void logFramingMode(String label, BenchResult r, long
totalCells) {
+ LOG.info(
+ "Framing mode {}: appendNs(min)={} recordCount={} wireBytes={}
bytesPerCell={} "
+ + "nsPerCell={}",
+ label, r.appendNs, r.recordCount, r.wireBytes,
+ String.format("%.2f", (double) r.wireBytes / Math.max(1, totalCells)),
+ String.format("%.2f", (double) r.appendNs / Math.max(1, totalCells)));
+ }
+
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
index 1016eae131..59c105513c 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
@@ -131,7 +131,7 @@ public class LogFileCompressionTest {
for (int i = 0; i < 100; i++) {
LogFile.Record record = LogFileTestUtil.newPutRecord("TBLSBWC", i, "row"
+ i, 100L + i, 2);
originals.add(record);
- writer.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ writer.append(record.getHBaseTableName(), record.getCommitId(),
record.getCells());
}
writer.close();
initLogFileReader();
@@ -148,7 +148,7 @@ public class LogFileCompressionTest {
for (int i = 0; i < 100_000; i++) {
LogFile.Record record = LogFileTestUtil.newPutRecord("TBLMBWC", i, "row"
+ i, 100L + i, 5);
originals.add(record);
- writer.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ writer.append(record.getHBaseTableName(), record.getCommitId(),
record.getCells());
}
writer.close();
initLogFileReader();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
index 1969fcc721..a0a4e43442 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
@@ -30,6 +30,8 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -44,6 +47,17 @@ import org.apache.hadoop.hbase.util.Bytes;
public interface LogFileTestUtil {
+ /**
+ * Flatten a mutation's family cell map into the cell list that the writer
ultimately receives.
+ */
+ static List<Cell> cellsOf(Mutation mutation) {
+ List<Cell> cells = new ArrayList<>();
+ for (List<Cell> familyCells : mutation.getFamilyCellMap().values()) {
+ cells.addAll(familyCells);
+ }
+ return cells;
+ }
+
static LogFile.Record newPutRecord(String table, long commitId, String
rowKey, long ts,
int numCols) {
return new LogFileRecord().setMutation(newPut(rowKey, ts,
numCols)).setHBaseTableName(table)
@@ -120,16 +134,33 @@ public interface LogFileTestUtil {
static void assertRecordEquals(String message, LogFile.Record r1,
LogFile.Record r2)
throws AssertionError {
- try {
- if (
- !r1.getMutation().toJSON().equals(r2.getMutation().toJSON())
- || !r1.getHBaseTableName().equals(r2.getHBaseTableName())
- || r1.getCommitId() != r2.getCommitId()
- ) {
- throw new AssertionError(message + ": left=" + r1 + ", right=" + r2);
+ if (
+ !r1.getHBaseTableName().equals(r2.getHBaseTableName()) ||
r1.getCommitId() != r2.getCommitId()
+ ) {
+ throw new AssertionError(message + ": left=" + r1 + ", right=" + r2);
+ }
+ if (r1.getCells().size() != r2.getCells().size()) {
+ throw new AssertionError(message + ": cell count mismatch: left=" + r1 +
", right=" + r2);
+ }
+ for (int i = 0; i < r1.getCells().size(); i++) {
+ Cell c1 = r1.getCells().get(i);
+ Cell c2 = r2.getCells().get(i);
+ // CellUtil.equals compares row/family/qualifier/timestamp/type but not
value.
+ if (!CellUtil.equals(c1, c2) || !CellUtil.matchingValue(c1, c2)) {
+ throw new AssertionError(
+ message + ": cell #" + i + " mismatch: left=" + r1 + ", right=" +
r2);
+ }
+ }
+ if (r1.getAttributes().size() != r2.getAttributes().size()) {
+ throw new AssertionError(
+ message + ": attribute count mismatch: left=" + r1 + ", right=" + r2);
+ }
+ for (java.util.Map.Entry<String, byte[]> e :
r1.getAttributes().entrySet()) {
+ byte[] other = r2.getAttributes().get(e.getKey());
+ if (other == null || !java.util.Arrays.equals(e.getValue(), other)) {
+ throw new AssertionError(
+ message + ": attribute '" + e.getKey() + "' mismatch: left=" + r1 +
", right=" + r2);
}
- } catch (IOException e) {
- throw new AssertionError(e.getMessage());
}
}
@@ -158,12 +189,22 @@ public interface LogFileTestUtil {
}
static void assertMutationEquals(String message, Mutation m1, Mutation m2) {
- try {
- if (!m1.toJSON().equals(m2.toJSON())) {
- throw new AssertionError(message + ": left=" + m1 + ", right=" + m2);
+ if (!Bytes.equals(m1.getRow(), m2.getRow())) {
+ throw new AssertionError(message + ": row mismatch: left=" + m1 + ",
right=" + m2);
+ }
+ if (m1.getClass() != m2.getClass()) {
+ throw new AssertionError(message + ": type mismatch: left=" + m1 + ",
right=" + m2);
+ }
+ List<Cell> c1 = cellsOf(m1);
+ List<Cell> c2 = cellsOf(m2);
+ if (c1.size() != c2.size()) {
+ throw new AssertionError(message + ": cell count mismatch: left=" + m1 +
", right=" + m2);
+ }
+ for (int i = 0; i < c1.size(); i++) {
+ if (!CellUtil.equals(c1.get(i), c2.get(i)) ||
!CellUtil.matchingValue(c1.get(i), c2.get(i))) {
+ throw new AssertionError(
+ message + ": cell #" + i + " mismatch: left=" + m1 + ", right=" +
m2);
}
- } catch (IOException e) {
- throw new AssertionError(e.getMessage());
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
index 8ecae075e9..baccf55973 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
@@ -88,9 +88,9 @@ public class LogFileWriterSyncTest {
// Append data
Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
- writer.append("TBL", 1L, m1);
+ writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
Mutation m2 = LogFileTestUtil.newPut("row2", 2L, 1);
- writer.append("TBL", 2L, m2);
+ writer.append("TBL", 2L, LogFileTestUtil.cellsOf(m2));
// Sync the writer
writer.sync();
@@ -101,7 +101,7 @@ public class LogFileWriterSyncTest {
// Append more data after sync
Mutation m3 = LogFileTestUtil.newPut("row3", 12L, 1);
- writer.append("TBL", 3L, m3);
+ writer.append("TBL", 3L, LogFileTestUtil.cellsOf(m3));
// Sync again
writer.sync();
@@ -116,7 +116,7 @@ public class LogFileWriterSyncTest {
// Append A, then sync
Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
- writer.append("TBL", 1L, m1);
+ writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
writer.sync();
// Verify first hsync
@@ -124,7 +124,7 @@ public class LogFileWriterSyncTest {
// Append B after sync
Mutation m2 = LogFileTestUtil.newPut("row2", 2L, 1);
- writer.append("TBL", 2L, m2);
+ writer.append("TBL", 2L, LogFileTestUtil.cellsOf(m2));
// Verify hsync was NOT called immediately after appending B. It might be
called later on
// close or another sync.
@@ -143,7 +143,7 @@ public class LogFileWriterSyncTest {
// Append A
Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
- writer.append("TBL", 1L, m1);
+ writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
// Sync 1
writer.sync();
@@ -156,7 +156,7 @@ public class LogFileWriterSyncTest {
// Append B
Mutation m2 = LogFileTestUtil.newPut("row2", 2L, 1);
- writer.append("TBL", 2L, m2);
+ writer.append("TBL", 2L, LogFileTestUtil.cellsOf(m2));
// Sync 3
writer.sync();
@@ -181,7 +181,7 @@ public class LogFileWriterSyncTest {
// Append a record
Mutation m1 = LogFileTestUtil.newPut("row", 1L, 1);
- writer.append("TBL", 1L, m1);
+ writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
// Sync again
writer.sync();
@@ -211,7 +211,7 @@ public class LogFileWriterSyncTest {
try {
Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
- hflushWriter.append("TBL", 1L, m1);
+ hflushWriter.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
hflushWriter.sync();
verify(hflushOutput, times(1)).hflush();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
index 95c6bb5133..e0e8d59985 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
@@ -73,10 +73,10 @@ public class LogFileWriterTest {
initLogFileWriter();
LogFile.Record r1 = LogFileTestUtil.newPutRecord("TBLTLFW", 100, "row1",
10L, 1);
LogFile.Record r2 = LogFileTestUtil.newDeleteRecord("TBLTLFW", 101,
"row2", 11L, 1);
- writer.append(r1.getHBaseTableName(), r1.getCommitId(), r1.getMutation());
+ writer.append(r1.getHBaseTableName(), r1.getCommitId(), r1.getCells());
LOG.debug("Appended " + r1);
writer.sync();
- writer.append(r2.getHBaseTableName(), r2.getCommitId(), r2.getMutation());
+ writer.append(r2.getHBaseTableName(), r2.getCommitId(), r2.getCells());
LOG.debug("Appended " + r2);
writer.close();
@@ -108,7 +108,7 @@ public class LogFileWriterTest {
LogFile.Record record =
LogFileTestUtil.newPutRecord("TBLFRI", 100L + i, "row" + i, 10L + i,
1);
originals.add(record);
- writer.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ writer.append(record.getHBaseTableName(), record.getCommitId(),
record.getCells());
}
writer.close();