This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 32f4b655f0 PHOENIX-7931 Coalesce per-batch mutations into a single 
record (#2540)
32f4b655f0 is described below

commit 32f4b655f0adec920db14f23e6b8732d541008d0
Author: tkhurana <[email protected]>
AuthorDate: Wed Jul 1 17:21:51 2026 -0700

    PHOENIX-7931 Coalesce per-batch mutations into a single record (#2540)
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 109 +++------
 .../phoenix/replication/MutationCellGrouper.java   |  80 ++++++
 .../apache/phoenix/replication/ReplicationLog.java |   9 +-
 .../phoenix/replication/ReplicationLogGroup.java   |  48 +++-
 .../apache/phoenix/replication/log/LogFile.java    |  65 ++++-
 .../phoenix/replication/log/LogFileCodec.java      | 218 ++++++++---------
 .../phoenix/replication/log/LogFileRecord.java     | 125 +++++-----
 .../phoenix/replication/log/LogFileWriter.java     |   7 +-
 .../reader/ReplicationLogProcessor.java            |  31 +--
 .../phoenix/replication/tool/LogFileAnalyzer.java  |  43 +++-
 .../phoenix/replication/ReplicationLogGroupIT.java | 139 ++++++++---
 .../reader/ReplicationLogProcessorTestIT.java      |  26 +-
 .../replication/MutationCellGrouperTest.java       | 210 ++++++++++++++++
 .../replication/ReplicationLogGroupTest.java       | 271 ++++++++++++++++-----
 .../phoenix/replication/log/LogFileCodecTest.java  | 194 ++++++++++++++-
 .../replication/log/LogFileCompressionTest.java    |   4 +-
 .../phoenix/replication/log/LogFileTestUtil.java   |  69 ++++--
 .../replication/log/LogFileWriterSyncTest.java     |  18 +-
 .../phoenix/replication/log/LogFileWriterTest.java |   6 +-
 19 files changed, 1229 insertions(+), 443 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index c7825ce96c..533c1b4da3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -130,6 +130,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.MutationCellGrouper;
 import org.apache.phoenix.replication.ReplicationLogGroup;
 import org.apache.phoenix.replication.SystemCatalogWALEntryFilter;
 import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
@@ -163,7 +164,6 @@ import org.xerial.snappy.Snappy;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -861,7 +861,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
     if (!regularCells.isEmpty()) {
       String tableName = logKey.getTableName().getNameAsString();
-      for (Mutation split : splitCellsIntoMutations(regularCells)) {
+      for (Mutation split : 
MutationCellGrouper.splitCellsIntoMutations(regularCells)) {
         if (!this.ignoreReplicationFilter.test(split)) {
           logGroup.append(tableName, -1, split);
         }
@@ -2873,52 +2873,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     return status.getOperationStatusCode() == SUCCESS && status.getResult() != 
null;
   }
 
-  /**
-   * Splits cells into individual Put/Delete mutations grouped by (row key, 
put-vs-delete). HBase's
-   * checkAndMergeCPMutations merges coprocessor cells into the data mutation, 
so a single Put may
-   * contain Delete cells with different row keys (e.g., local index). This 
method recovers distinct
-   * mutations using the same grouping algorithm as HBase's ReplicationSink.
-   */
-  private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
-    return previousCell == null || previousCell.getType() != cell.getType()
-      || !CellUtil.matchingRows(previousCell, cell);
-  }
-
-  static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) throws 
IOException {
-    List<Mutation> result = new ArrayList<>();
-    Cell previousCell = null;
-    Mutation current = null;
-    for (Cell cell : cells) {
-      if (isNewRowOrType(previousCell, cell)) {
-        if (current != null) {
-          result.add(current);
-        }
-        if (CellUtil.isDelete(cell)) {
-          current = new Delete(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
-        } else {
-          current = new Put(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
-        }
-      }
-      if (CellUtil.isDelete(cell)) {
-        ((Delete) current).add(cell);
-      } else {
-        ((Put) current).add(cell);
-      }
-      previousCell = cell;
-    }
-    if (current != null) {
-      result.add(current);
-    }
-    return result;
-  }
-
-  static List<Mutation> splitCellsIntoMutations(Mutation merged) throws 
IOException {
-    if (merged.isEmpty()) {
-      return Collections.singletonList(merged);
-    }
-    return 
splitCellsIntoMutations(Iterables.concat(merged.getFamilyCellMap().values()));
-  }
-
   private void replicateMutations(RegionCoprocessorEnvironment env,
     MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext 
context)
     throws IOException {
@@ -2939,6 +2893,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     // Record ReplicationSyncTime only when we are actually doing work (not on 
early-return paths).
     long start = EnvironmentEdgeManager.currentTimeMillis();
     try {
+      List<Cell> dataTableCells = new ArrayList<>();
       for (int i = 0; i < miniBatchOp.size(); i++) {
         Mutation m = miniBatchOp.getOperation(i);
         if (this.ignoreReplicationFilter.test(m)) {
@@ -2946,37 +2901,45 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
         // When coprocessors add cells (local index, conditional TTL, ON 
DUPLICATE KEY UPDATE),
         // HBase merges them into the data mutation which can mix row keys and 
cell types.
-        // Split those back into individual Put/Delete mutations for correct 
serialization.
-        if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
-          group.append(this.dataTableName, -1, m);
-        } else {
-          for (Mutation split : splitCellsIntoMutations(m)) {
-            group.append(this.dataTableName, -1, split);
-          }
-        }
+        // We stream the cells through as-is — the consumer reconstructs 
Put/Delete mutations
+        // on the row+type boundary.
+        appendCells(dataTableCells, m);
       }
-      if (context.preIndexUpdates != null) {
-        for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.preIndexUpdates
-          .entries()) {
-          if (this.ignoreReplicationFilter.test(entry.getValue())) {
-            continue;
-          }
-          group.append(entry.getKey().getTableName(), -1, entry.getValue());
-        }
-      }
-      if (context.postIndexUpdates != null) {
-        for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.postIndexUpdates
-          .entries()) {
-          if (this.ignoreReplicationFilter.test(entry.getValue())) {
-            continue;
-          }
-          group.append(entry.getKey().getTableName(), -1, entry.getValue());
-        }
+      if (!dataTableCells.isEmpty()) {
+        group.append(this.dataTableName, -1, dataTableCells);
       }
+      appendIndexUpdates(group, context.preIndexUpdates);
+      appendIndexUpdates(group, context.postIndexUpdates);
       group.sync();
     } finally {
       long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
       metricSource.updateReplicationSyncTime(this.dataTableName, duration);
     }
   }
+
+  private void appendIndexUpdates(ReplicationLogGroup group,
+    ListMultimap<HTableInterfaceReference, Mutation> updates) throws 
IOException {
+    if (updates == null) {
+      return;
+    }
+    for (Map.Entry<HTableInterfaceReference, Collection<Mutation>> entry : 
updates.asMap()
+      .entrySet()) {
+      List<Cell> cells = new ArrayList<>();
+      for (Mutation m : entry.getValue()) {
+        if (this.ignoreReplicationFilter.test(m)) {
+          continue;
+        }
+        appendCells(cells, m);
+      }
+      if (!cells.isEmpty()) {
+        group.append(entry.getKey().getTableName(), -1, cells);
+      }
+    }
+  }
+
+  private static void appendCells(List<Cell> bucket, Mutation m) {
+    for (List<Cell> familyCells : m.getFamilyCellMap().values()) {
+      bucket.addAll(familyCells);
+    }
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/MutationCellGrouper.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/MutationCellGrouper.java
new file mode 100644
index 0000000000..d058bd192a
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/MutationCellGrouper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Groups a flat cell stream into Put/Delete mutations, mirroring the 
algorithm HBase's
+ * ReplicationSink uses to reconstruct mutations from a WALEdit. A new 
mutation is started whenever
+ * the row key or the cell type differs from the immediately preceding cell; 
consecutive cells
+ * sharing both are collected into one mutation. Because the full cell type 
participates in the
+ * boundary, distinct delete subtypes (e.g. DeleteColumn vs. DeleteFamily) 
also split into separate
+ * mutations. There is no precondition on the input ordering: any cell stream 
produces valid
+ * mutations. Ordering only affects how the cells are partitioned into 
Mutation objects (a row that
+ * recurs non-consecutively yields a separate mutation per run), not 
correctness -- cell order is
+ * preserved, so replaying the resulting mutations in order reproduces the 
effect of applying the
+ * input cells in order.
+ */
+public final class MutationCellGrouper {
+
+  private MutationCellGrouper() {
+  }
+
+  private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
+    return previousCell == null || previousCell.getType() != cell.getType()
+      || !CellUtil.matchingRows(previousCell, cell);
+  }
+
+  /** Group a cell stream into Put/Delete mutations using the row+type 
boundary algorithm. */
+  public static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) 
throws IOException {
+    List<Mutation> result = new ArrayList<>();
+    Cell previousCell = null;
+    Mutation current = null;
+    for (Cell cell : cells) {
+      if (isNewRowOrType(previousCell, cell)) {
+        if (current != null) {
+          result.add(current);
+        }
+        if (CellUtil.isDelete(cell)) {
+          current = new Delete(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
+        } else {
+          current = new Put(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
+        }
+      }
+      if (CellUtil.isDelete(cell)) {
+        ((Delete) current).add(cell);
+      } else {
+        ((Put) current).add(cell);
+      }
+      previousCell = cell;
+    }
+    if (current != null) {
+      result.add(current);
+    }
+    return result;
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 40337c190f..dc681e8d90 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.phoenix.replication.ReplicationLogGroup.Record;
 import org.apache.phoenix.replication.log.LogFileWriter;
@@ -312,7 +311,7 @@ public class ReplicationLog {
     LOG.info("Replaying {} unsynced records into new writer {}", 
currentBatch.size(),
       currentWriter);
     for (Record r : currentBatch) {
-      currentWriter.append(r.tableName, r.commitId, r.mutation);
+      currentWriter.append(r.tableName, r.commitId, r.cells);
     }
   }
 
@@ -352,7 +351,7 @@ public class ReplicationLog {
   protected void append(Record r) throws IOException {
     final boolean[] blockSynced = { false };
     apply(writer -> {
-      blockSynced[0] = writer.append(r.tableName, r.commitId, r.mutation);
+      blockSynced[0] = writer.append(r.tableName, r.commitId, r.cells);
     });
     // Add to current batch only after we succeed at appending
     currentBatch.add(r);
@@ -367,10 +366,6 @@ public class ReplicationLog {
     }
   }
 
-  protected void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
-    apply(writer -> writer.append(tableName, commitId, mutation));
-  }
-
   protected void sync() throws IOException {
     apply(LogFileWriter::sync);
     currentBatch.clear();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index 0a862d640a..0e38bd5528 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.jdbc.HAGroupStoreManager;
@@ -329,15 +330,19 @@ public class ReplicationLogGroup {
     }
   }
 
+  /**
+   * Append payload carried through the ring buffer. Always carries a flat 
{@link List} of
+   * {@link Cell}s; the per-mutation public API extracts the cells before 
publishing.
+   */
   protected static class Record {
-    public String tableName;
-    public long commitId;
-    public Mutation mutation;
+    public final String tableName;
+    public final long commitId;
+    public final List<Cell> cells;
 
-    public Record(String tableName, long commitId, Mutation mutation) {
+    public Record(String tableName, long commitId, List<Cell> cells) {
       this.tableName = tableName;
       this.commitId = commitId;
-      this.mutation = mutation;
+      this.cells = cells;
     }
   }
 
@@ -551,6 +556,37 @@ public class ReplicationLogGroup {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, 
commitId, mutation);
     }
+    List<Cell> cells = new ArrayList<>();
+    for (List<Cell> familyCells : mutation.getFamilyCellMap().values()) {
+      cells.addAll(familyCells);
+    }
+    if (cells.isEmpty()) {
+      throw new IllegalArgumentException("Cannot append a mutation with no 
cells");
+    }
+    publishDataEvent(new Record(tableName, commitId, cells));
+  }
+
+  /**
+   * Append a coalesced batch of cells as a single record on the log. The 
cells must already be
+   * grouped by row+type so consumers can reconstruct Put/Delete mutations on 
the row+type boundary
+   * (see {@link MutationCellGrouper}). Behaves identically to
+   * {@link #append(String, long, Mutation)} with respect to backpressure and 
fail-stop.
+   * @param tableName The HBase table name shared by every cell in {@code 
cells}.
+   * @param commitId  The commit identifier (e.g., SCN) for the batch.
+   * @param cells     The flat ordered cell stream for one batch on one table.
+   * @throws IOException If the writer is closed or the ring buffer is full.
+   */
+  public void append(String tableName, long commitId, List<Cell> cells) throws 
IOException {
+    if (cells == null || cells.isEmpty()) {
+      throw new IllegalArgumentException("Cannot append a record with no 
cells");
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Append: table={}, commitId={}, cells={}", tableName, 
commitId, cells.size());
+    }
+    publishDataEvent(new Record(tableName, commitId, cells));
+  }
+
+  private void publishDataEvent(Record record) throws IOException {
     if (isClosed()) {
       throw new IOException("Closed");
     }
@@ -566,7 +602,7 @@ public class ReplicationLogGroup {
       long sequence = ringBuffer.next();
       try {
         LogEvent event = ringBuffer.get(sequence);
-        event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId, 
mutation), null);
+        event.setValues(EVENT_TYPE_DATA, record, null);
       } finally {
         // Update ring buffer events metric
         ringBuffer.publish(sequence);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index a93ada0b48..8e3e2ea25f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -22,10 +22,13 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.compress.Compression;
 
@@ -258,16 +261,57 @@ public interface LogFile {
     void write(DataOutput out) throws IOException;
   }
 
-  /** Represents a single logical change */
+  /** Represents a single logical change (a batch of cells across one or more 
rows) */
   interface Record {
     /**
-     * Gets the mutation this record represents.
-     * @return The Mutation.
+     * Gets the cells in this record's body.
+     * @return The cell list.
      */
-    Mutation getMutation();
+    List<Cell> getCells();
 
     /**
-     * Sets the mutation this record represents.
+     * Sets the cells in this record's body.
+     * @param cells The cell list to set.
+     * @return This Record instance for chaining.
+     */
+    Record setCells(List<Cell> cells);
+
+    /**
+     * Gets the attributes for this record. These apply uniformly to every 
mutation reconstructed
+     * from {@link #getCells()}.
+     * @return The attribute map (keys are attribute names, values are 
attribute byte arrays).
+     */
+    Map<String, byte[]> getAttributes();
+
+    /**
+     * Sets the attributes for this record.
+     * @param attributes The attribute map to set.
+     * @return This Record instance for chaining.
+     */
+    Record setAttributes(Map<String, byte[]> attributes);
+
+    /**
+     * Reconstructs the mutations in this record by grouping {@link 
#getCells()} on the row+type
+     * boundary (one mutation per contiguous run of cells with the same row 
and same put-vs-delete
+     * disposition). Attributes from {@link #getAttributes()} are applied to 
each result mutation.
+     * @return The list of reconstructed Put/Delete mutations.
+     * @throws IOException if mutation assembly fails.
+     */
+    List<Mutation> getMutations() throws IOException;
+
+    /**
+     * Convenience accessor that returns the single mutation in this record. 
Throws if
+     * {@link #getMutations()} produces anything other than exactly one entry.
+     * @return The single reconstructed Put or Delete mutation.
+     * @throws IllegalStateException if the body does not contain exactly one 
mutation.
+     * @throws IOException           if mutation assembly fails.
+     */
+    Mutation getMutation() throws IOException;
+
+    /**
+     * Convenience setter that populates this record's cell body from a single 
mutation. Cells are
+     * taken from the mutation's family cell map. Attributes are cleared; 
callers that need
+     * attributes on the record must set them explicitly via {@link 
#setAttributes(Map)}.
      * @param mutation The Mutation to set.
      * @return This Record instance for chaining.
      */
@@ -324,14 +368,15 @@ public interface LogFile {
     void init(LogFileWriterContext context) throws IOException;
 
     /**
-     * Appends an HBase mutation to the log file. The log record may be 
buffered internally.
-     * @param tableName The HBase table name
-     * @param commitId  The commit identifier
-     * @param mutation  The mutation to append.
+     * Appends a coalesced batch of cells as a single record. The cells must 
already be grouped by
+     * row+type so consumers can reconstruct Put/Delete mutations on the 
row+type boundary.
+     * @param tableName The HBase table name shared by every cell.
+     * @param commitId  The commit identifier.
+     * @param cells     The flat ordered cell stream for one batch on one 
table.
      * @return true if an implicit sync happened (block full), false if 
buffered only
      * @throws IOException if an I/O error occurs during append.
      */
-    boolean append(String tableName, long commitId, Mutation mutation) throws 
IOException;
+    boolean append(String tableName, long commitId, List<Cell> cells) throws 
IOException;
 
     /**
      * Flushes any buffered data to the underlying storage and ensures it is 
durable (e.g., by
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
index a0975287d3..0f25aa23fb 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java
@@ -26,55 +26,56 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ByteBuffInputStream;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
- * Default Codec for encoding and decoding ReplicationLog Records within a 
block buffer. This
- * implementation uses standard Java DataInput/DataOutput for serialization. 
Record Format within a
- * block:
+ * Default Codec for encoding and decoding ReplicationLog Records within a 
block buffer. The on-disk
+ * record is cell-oriented (mirroring HBase's WALEdit): a record carries a 
flat ordered list of
+ * cells across one or more rows. Mutation reconstruction (grouping by 
row+type) is the consumer's
+ * responsibility and lives in {@link 
org.apache.phoenix.replication.MutationCellGrouper}.
  *
  * <pre>
  *   +--------------------------------------------+
  *   | RECORD LENGTH (vint)                       |
  *   +--------------------------------------------+
  *   | RECORD HEADER                              |
- *   |  - Mutation type (byte)                    |
  *   |  - HBase table name length (vint)          |
  *   |  - HBase table name (byte[])               |
- *   |  - Transaction/SCN or commit ID (vint)     |
+ *   |  - Commit ID (vlong)                       |
  *   +--------------------------------------------+
- *   | ROW KEY LENGTH (vint)                      |
- *   | ROW KEY (byte[])                           |
+ *   | NUMBER OF ATTRIBUTES (vint)                |
  *   +--------------------------------------------+
- *   | MUTATION TIMESTAMP (vint)                  |
+ *   | PER-ATTRIBUTE (repeated)                   |
+ *   |   +--------------------------------------+ |
+ *   |   | ATTRIBUTE KEY LENGTH (vint)          | |
+ *   |   | ATTRIBUTE KEY (byte[])               | |
+ *   |   | ATTRIBUTE VALUE LENGTH (vint)        | |
+ *   |   | ATTRIBUTE VALUE (byte[])             | |
+ *   |   +--------------------------------------+ |
  *   +--------------------------------------------+
- *   | NUMBER OF COLUMN FAMILIES CHANGED (vint)   |
+ *   | NUMBER OF CELLS (vint)                     |
  *   +--------------------------------------------+
- *   | PER-FAMILY DATA (repeated)                 |
- *   |   +--------------------------------------+ |
- *   |   | COLUMN FAMILY NAME LENGTH (vint)     | |
- *   |   | COLUMN FAMILY NAME (byte[])          | |
- *   |   | NUMBER OF CELLS IN FAMILY (vint)     | |
+ *   | PER-CELL DATA (repeated)                   |
  *   |   +--------------------------------------+ |
- *   |   | PER-CELL DATA (repeated)             | |
- *   |   |   +––––––––––––----------------–--–+ | |
- *   |   |   | CELL TIMESTAMP (long)          | | |
- *   |   |   | CELL TYPE (byte)               | | |
- *   |   |   | COLUMN QUALIFIER LENGTH (vint) | | |
- *   |   |   | COLUMN QUALIFIER (byte[])      | | |
- *   |   |   | VALUE LENGTH (vint)            | | |
- *   |   |   | VALUE (byte[])                 | | |
- *   |   |   +–––––––––––––--------------––--–+ | |
+ *   |   | ROW LENGTH (vint)                    | |
+ *   |   | ROW (byte[])                         | |
+ *   |   | FAMILY LENGTH (vint)                 | |
+ *   |   | FAMILY (byte[])                      | |
+ *   |   | QUALIFIER LENGTH (vint)              | |
+ *   |   | QUALIFIER (byte[])                   | |
+ *   |   | TIMESTAMP (long)                     | |
+ *   |   | TYPE (byte)                          | |
+ *   |   | VALUE LENGTH (vint)                  | |
+ *   |   | VALUE (byte[])                       | |
  *   |   +--------------------------------------+ |
  *   +--------------------------------------------+
  * </pre>
@@ -114,64 +115,65 @@ public class LogFileCodec implements LogFile.Codec {
 
     @Override
     public void write(LogFile.Record record) throws IOException {
-
+      if (record.getCells().isEmpty()) {
+        throw new IllegalArgumentException("Cannot encode a record with no 
cells");
+      }
       DataOutput recordOut = new DataOutputStream(currentRecord);
 
-      // Write record fields
-
-      Mutation mutation = record.getMutation();
-      LogFileRecord.MutationType mutationType = 
LogFileRecord.MutationType.get(mutation);
-      recordOut.writeByte(mutationType.getCode());
+      // Header: table name + commit id
       byte[] nameBytes = 
record.getHBaseTableName().getBytes(StandardCharsets.UTF_8);
       WritableUtils.writeVInt(recordOut, nameBytes.length);
       recordOut.write(nameBytes);
       WritableUtils.writeVLong(recordOut, record.getCommitId());
-      byte[] rowKey = mutation.getRow();
-      WritableUtils.writeVInt(recordOut, rowKey.length);
-      recordOut.write(rowKey);
-      recordOut.writeLong(mutation.getTimestamp());
 
-      Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
-      int cfCount = familyMap.size();
-      WritableUtils.writeVInt(recordOut, cfCount);
+      // Attributes
+      Map<String, byte[]> attrs = record.getAttributes();
+      WritableUtils.writeVInt(recordOut, attrs.size());
+      for (Map.Entry<String, byte[]> e : attrs.entrySet()) {
+        byte[] keyBytes = e.getKey().getBytes(StandardCharsets.UTF_8);
+        WritableUtils.writeVInt(recordOut, keyBytes.length);
+        recordOut.write(keyBytes);
+        byte[] val = e.getValue();
+        WritableUtils.writeVInt(recordOut, val.length);
+        if (val.length > 0) {
+          recordOut.write(val);
+        }
+      }
 
-      for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
-        byte[] cf = entry.getKey();
-        WritableUtils.writeVInt(recordOut, cf.length);
-        recordOut.write(cf);
-        List<Cell> cells = entry.getValue();
-        WritableUtils.writeVInt(recordOut, cells.size());
-        for (Cell cell : cells) {
-          recordOut.writeLong(cell.getTimestamp());
-          recordOut.writeByte(cell.getTypeByte());
-          WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
+      // Cells
+      List<Cell> cells = record.getCells();
+      WritableUtils.writeVInt(recordOut, cells.size());
+      for (Cell cell : cells) {
+        WritableUtils.writeVInt(recordOut, cell.getRowLength());
+        recordOut.write(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
+        WritableUtils.writeVInt(recordOut, cell.getFamilyLength());
+        recordOut.write(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength());
+        WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
+        if (cell.getQualifierLength() > 0) {
           recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
             cell.getQualifierLength());
-          WritableUtils.writeVInt(recordOut, cell.getValueLength());
-          if (cell.getValueLength() > 0) {
-            recordOut.write(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
-          }
+        }
+        recordOut.writeLong(cell.getTimestamp());
+        recordOut.writeByte(cell.getTypeByte());
+        WritableUtils.writeVInt(recordOut, cell.getValueLength());
+        if (cell.getValueLength() > 0) {
+          recordOut.write(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
         }
       }
 
       byte[] currentRecordBytes = currentRecord.toByteArray();
-      // Write total record length
       WritableUtils.writeVInt(out, currentRecordBytes.length);
-      // Write the record
       out.write(currentRecordBytes);
 
-      // Set the size (including the vint prefix) on the record object
       ((LogFileRecord) record).setSerializedLength(
         currentRecordBytes.length + 
WritableUtils.getVIntSize(currentRecordBytes.length));
 
-      // Reset the ByteArrayOutputStream to release resources
       currentRecord.reset();
     }
   }
 
   private static class RecordDecoder implements LogFile.Codec.Decoder {
     private final DataInput in;
-    // A reference to the object populated by the last successful advance()
     private LogFileRecord current = null;
 
     RecordDecoder(DataInput in) {
@@ -185,78 +187,64 @@ public class LogFileCodec implements LogFile.Codec {
         recordDataLength += WritableUtils.getVIntSize(recordDataLength);
 
         current = new LogFileRecord();
-        // Set the total serialized length on the record
         current.setSerializedLength(recordDataLength);
 
-        LogFileRecord.MutationType type = 
LogFileRecord.MutationType.codeToType(in.readByte());
-
+        // Header
         int nameBytesLen = WritableUtils.readVInt(in);
         byte[] nameBytes = new byte[nameBytesLen];
         in.readFully(nameBytes);
-
         current.setHBaseTableName(Bytes.toString(nameBytes));
-
         current.setCommitId(WritableUtils.readVLong(in));
 
-        int rowKeyLen = WritableUtils.readVInt(in);
-        byte[] rowKey = new byte[rowKeyLen];
-        in.readFully(rowKey);
-
-        Mutation mutation;
-        switch (type) {
-          case PUT:
-            mutation = new Put(rowKey);
-            break;
-          case DELETE:
-            mutation = new Delete(rowKey);
-            break;
-          default:
-            throw new UnsupportedOperationException("Unhandled mutation type " 
+ type);
+        // Attributes
+        int attrCount = WritableUtils.readVInt(in);
+        Map<String, byte[]> attrs = attrCount == 0 ? new HashMap<>() : new 
HashMap<>(attrCount);
+        for (int i = 0; i < attrCount; i++) {
+          int keyLen = WritableUtils.readVInt(in);
+          byte[] keyBytes = new byte[keyLen];
+          in.readFully(keyBytes);
+          int valLen = WritableUtils.readVInt(in);
+          byte[] valBytes = new byte[valLen];
+          if (valLen > 0) {
+            in.readFully(valBytes);
+          }
+          attrs.put(new String(keyBytes, StandardCharsets.UTF_8), valBytes);
         }
-        current.setMutation(mutation);
+        current.setAttributes(attrs);
 
-        long ts = in.readLong();
-        mutation.setTimestamp(ts);
-
-        int cfCount = WritableUtils.readVInt(in);
-        for (int i = 0; i < cfCount; i++) {
-          // Col name
-          int cfLen = WritableUtils.readVInt(in);
-          byte[] cf = new byte[cfLen];
-          in.readFully(cf);
-          // Qualifiers+Values Count
-          int columnValuePairsCount = WritableUtils.readVInt(in);
-          for (int j = 0; j < columnValuePairsCount; j++) {
-            // Cell timestamp
-            long cellTs = in.readLong();
-            // Cell type byte
-            byte cellTypeByte = in.readByte();
-            // Qualifier name
-            int qualLen = WritableUtils.readVInt(in);
-            byte[] qual = new byte[qualLen];
-            if (qualLen > 0) {
-              in.readFully(qual);
-            }
-            // Value
-            int valueLen = WritableUtils.readVInt(in);
-            byte[] value = new byte[valueLen];
-            if (valueLen > 0) {
-              in.readFully(value);
-            }
-            Cell cell = new KeyValue(rowKey, 0, rowKey.length, cf, 0, 
cf.length, qual, 0,
-              qual.length, cellTs, KeyValue.Type.codeToType(cellTypeByte), 
value, 0, value.length);
-            if (mutation instanceof Put) {
-              ((Put) mutation).add(cell);
-            } else {
-              ((Delete) mutation).add(cell);
-            }
+        // Cells
+        int cellCount = WritableUtils.readVInt(in);
+        List<Cell> cells = new ArrayList<>(cellCount);
+        for (int i = 0; i < cellCount; i++) {
+          int rowLen = WritableUtils.readVInt(in);
+          byte[] row = new byte[rowLen];
+          if (rowLen > 0) {
+            in.readFully(row);
+          }
+          int famLen = WritableUtils.readVInt(in);
+          byte[] family = new byte[famLen];
+          if (famLen > 0) {
+            in.readFully(family);
+          }
+          int qualLen = WritableUtils.readVInt(in);
+          byte[] qual = new byte[qualLen];
+          if (qualLen > 0) {
+            in.readFully(qual);
+          }
+          long ts = in.readLong();
+          byte typeByte = in.readByte();
+          int valueLen = WritableUtils.readVInt(in);
+          byte[] value = new byte[valueLen];
+          if (valueLen > 0) {
+            in.readFully(value);
           }
+          cells.add(new KeyValue(row, 0, row.length, family, 0, family.length, 
qual, 0, qual.length,
+            ts, KeyValue.Type.codeToType(typeByte), value, 0, value.length));
         }
+        current.setCells(cells);
 
-        // Successfully read a record
         return true;
       } catch (EOFException e) {
-        // End of stream
         current = null;
         return false;
       }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
index 6b12a3e659..e9c5b807b0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileRecord.java
@@ -17,9 +17,16 @@
  */
 package org.apache.phoenix.replication.log;
 
-import org.apache.hadoop.hbase.client.Delete;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.phoenix.replication.MutationCellGrouper;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = { "EI_EXPOSE_REP", 
"EI_EXPOSE_REP2" },
     justification = "Intentional")
@@ -27,7 +34,8 @@ public class LogFileRecord implements LogFile.Record {
 
   private String tableName;
   private long commitId;
-  private Mutation mutation;
+  private List<Cell> cells = Collections.emptyList();
+  private Map<String, byte[]> attributes = Collections.emptyMap();
   private int serializedLength;
 
   public LogFileRecord() {
@@ -56,93 +64,80 @@ public class LogFileRecord implements LogFile.Record {
   }
 
   @Override
-  public Mutation getMutation() {
-    return this.mutation;
+  public List<Cell> getCells() {
+    return cells;
   }
 
   @Override
-  public LogFile.Record setMutation(Mutation mutation) {
-    this.mutation = mutation;
+  public LogFile.Record setCells(List<Cell> cells) {
+    Preconditions.checkNotNull(cells, "cells must not be null");
+    this.cells = cells;
     return this;
   }
 
   @Override
-  public int getSerializedLength() {
-    // NOTE: Should be set by the Codec using setSerializedLength after 
reading or writing
-    // the record.
-    return this.serializedLength;
+  public Map<String, byte[]> getAttributes() {
+    return attributes;
   }
 
   @Override
-  public LogFile.Record setSerializedLength(int serializedLength) {
-    this.serializedLength = serializedLength;
+  public LogFile.Record setAttributes(Map<String, byte[]> attributes) {
+    Preconditions.checkNotNull(attributes, "attributes must not be null");
+    this.attributes = attributes;
     return this;
   }
 
   @Override
-  public int hashCode() {
-    int code = tableName.hashCode();
-    code ^= Long.hashCode(commitId);
-    code ^= mutation.toString().hashCode();
-    return code;
+  public List<Mutation> getMutations() throws IOException {
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    if (!attributes.isEmpty()) {
+      for (Mutation m : result) {
+        for (Map.Entry<String, byte[]> e : attributes.entrySet()) {
+          m.setAttribute(e.getKey(), e.getValue());
+        }
+      }
+    }
+    return result;
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
+  public Mutation getMutation() throws IOException {
+    List<Mutation> mutations = getMutations();
+    if (mutations.size() != 1) {
+      throw new IllegalStateException("Record does not contain exactly one 
mutation (count="
+        + mutations.size() + "); use getMutations() instead");
     }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    LogFileRecord other = (LogFileRecord) obj;
-    return tableName.equals(other.tableName) && commitId == other.commitId
-      && mutation.toString().equals(other.mutation.toString());
+    return mutations.get(0);
   }
 
   @Override
-  public String toString() {
-    return "LogFileRecord [mutation=" + mutation.toString() + ", tableName=" + 
tableName
-      + ", commitId=" + commitId + " ]";
-  }
-
-  // Internals only below. Not for LogFile interface consumer use.
-
-  protected enum MutationType {
-    PUT(1),
-    DELETE(2);
-
-    private int code;
-
-    MutationType(int code) {
-      this.code = code;
-    }
-
-    int getCode() {
-      return code;
+  public LogFile.Record setMutation(Mutation mutation) {
+    List<Cell> body = new ArrayList<>();
+    for (List<Cell> familyCells : mutation.getFamilyCellMap().values()) {
+      body.addAll(familyCells);
     }
+    this.cells = body;
+    this.attributes = Collections.emptyMap();
+    return this;
+  }
 
-    static MutationType get(Mutation mutation) {
-      if (mutation instanceof Put) {
-        return PUT;
-      } else if (mutation instanceof Delete) {
-        return DELETE;
-      }
-      throw new UnsupportedOperationException("Unsupported mutation type: " + 
mutation);
-    }
+  @Override
+  public int getSerializedLength() {
+    // NOTE: Should be set by the Codec using setSerializedLength after 
reading or writing
+    // the record.
+    return this.serializedLength;
+  }
 
-    static MutationType codeToType(int code) {
-      for (MutationType type : MutationType.values()) {
-        if (type.code == code) {
-          return type;
-        }
-      }
-      throw new UnsupportedOperationException("Unsupported mutation code: " + 
code);
-    }
+  @Override
+  public LogFile.Record setSerializedLength(int serializedLength) {
+    this.serializedLength = serializedLength;
+    return this;
+  }
 
+  @Override
+  public String toString() {
+    return "LogFileRecord [tableName=" + tableName + ", commitId=" + commitId 
+ ", cellCount="
+      + cells.size() + ", attrCount=" + attributes.size() + "]";
   }
 
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index dc0cb811f1..df3d428864 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -19,10 +19,11 @@ package org.apache.phoenix.replication.log;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,12 +80,12 @@ public class LogFileWriter implements LogFile.Writer {
   }
 
   @Override
-  public boolean append(String tableName, long commitId, Mutation mutation) 
throws IOException {
+  public boolean append(String tableName, long commitId, List<Cell> cells) 
throws IOException {
     if (isClosed()) {
       throw new IOException("Writer has been closed");
     }
     return writer.append(
-      new 
LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setMutation(mutation));
+      new 
LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setCells(cells));
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 432cf9189d..d9e488394c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -246,21 +246,22 @@ public class ReplicationLogProcessor implements Closeable 
{
 
       for (LogFile.Record record : logFileReader) {
         final TableName tableName = 
TableName.valueOf(record.getHBaseTableName());
-        final Mutation mutation = record.getMutation();
-
-        tableToMutationsMap.computeIfAbsent(tableName, k -> new 
ArrayList<>()).add(mutation);
-
-        // Increment current batch size and current batch size bytes
-        currentBatchSize++;
-        currentBatchSizeBytes += mutation.heapSize();
-
-        // Process when we reach either the batch count or size limit
-        if (currentBatchSize >= getBatchSize() || currentBatchSizeBytes >= 
getBatchSizeBytes()) {
-          processReplicationLogBatch(tableToMutationsMap);
-          totalProcessed += currentBatchSize;
-          tableToMutationsMap.clear();
-          currentBatchSize = 0;
-          currentBatchSizeBytes = 0;
+        // A record may reconstruct into multiple mutations. Batches split on 
the mutation
+        // boundary, so a single record's mutations can span two 
processReplicationLogBatch
+        // invocations -- do not assume per-record atomicity here.
+        for (Mutation mutation : record.getMutations()) {
+          tableToMutationsMap.computeIfAbsent(tableName, k -> new 
ArrayList<>()).add(mutation);
+          currentBatchSize++;
+          currentBatchSizeBytes += mutation.heapSize();
+
+          // Process when we reach either the batch count or size limit
+          if (currentBatchSize >= getBatchSize() || currentBatchSizeBytes >= 
getBatchSizeBytes()) {
+            processReplicationLogBatch(tableToMutationsMap);
+            totalProcessed += currentBatchSize;
+            tableToMutationsMap.clear();
+            currentBatchSize = 0;
+            currentBatchSizeBytes = 0;
+          }
         }
       }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
index 18172721a0..b61adc9fce 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
@@ -126,6 +126,43 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
     return allFiles;
   }
 
+  /**
+   * Count the number of log <em>records</em> (not flattened mutations) per 
table across all log
+   * files under {@code source}. Unlike {@link #groupLogsByTable(String)}, 
which expands each record
+   * into its constituent mutations, this preserves the record boundary so 
callers can assert the
+   * coalescing contract (one record per table per batch).
+   */
+  public Map<String, Integer> countRecordsByTable(String source) throws 
IOException {
+    Map<String, Integer> allFiles = Maps.newHashMap();
+    init();
+    Path path = new Path(source);
+    List<Path> filesToAnalyze = getFilesToAnalyze(path);
+    for (Path file : filesToAnalyze) {
+      Map<String, Integer> perFile = countRecordsByTable(file);
+      for (Map.Entry<String, Integer> entry : perFile.entrySet()) {
+        allFiles.merge(entry.getKey(), entry.getValue(), Integer::sum);
+      }
+    }
+    return allFiles;
+  }
+
+  private Map<String, Integer> countRecordsByTable(Path file) throws 
IOException {
+    Map<String, Integer> recordsByTable = Maps.newHashMap();
+    LogFileReaderContext context = new 
LogFileReaderContext(getConf()).setFileSystem(fs)
+      .setFilePath(file).setSkipCorruptBlocks(check);
+    LogFileReader reader = new LogFileReader();
+    try {
+      reader.init(context);
+      Record record;
+      while ((record = reader.next()) != null) {
+        recordsByTable.merge(record.getHBaseTableName(), 1, Integer::sum);
+      }
+    } finally {
+      reader.close();
+    }
+    return recordsByTable;
+  }
+
   private Map<String, List<Mutation>> groupLogsByTable(Path file) throws 
IOException {
     Map<String, List<Mutation>> mutationsByTable = Maps.newHashMap();
     System.out.println("\nAnalyzing file: " + file);
@@ -139,7 +176,7 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
       while ((record = reader.next()) != null) {
         String tableName = record.getHBaseTableName();
         List<Mutation> mutations = mutationsByTable.getOrDefault(tableName, 
Lists.newArrayList());
-        mutations.add(record.getMutation());
+        mutations.addAll(record.getMutations());
         mutationsByTable.put(tableName, mutations);
       }
     } finally {
@@ -198,7 +235,9 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
           System.out.println("\nRecord #" + recordCount + ":");
           System.out.println("  Table: " + record.getHBaseTableName());
           System.out.println("  Commit ID: " + record.getCommitId());
-          System.out.println("  Mutation: " + record.getMutation());
+          for (Mutation m : record.getMutations()) {
+            System.out.println("  Mutation: " + m);
+          }
           if (verbose) {
             System.out.println("  Serialized Length: " + 
record.getSerializedLength());
           }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index cb4ca7bcee..c4dd5149b2 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -67,7 +67,6 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.query.PhoenixTestBuilder;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.replication.metrics.ReplicationLogMetricValues;
 import org.apache.phoenix.replication.reader.ReplicationLogProcessor;
 import org.apache.phoenix.replication.tool.LogFileAnalyzer;
 import org.apache.phoenix.util.TestUtil;
@@ -151,6 +150,14 @@ public class ReplicationLogGroupIT extends HABaseIT {
     return mutations != null ? mutations.size() : 0;
   }
 
+  private Map<String, Integer> countRecordsByTable() throws Exception {
+    LogFileAnalyzer analyzer = new LogFileAnalyzer();
+    // use peer cluster conf
+    analyzer.setConf(conf2);
+    Path standByLogDir = 
logGroup.getOrCreatePeerShardManager().getRootDirectoryPath();
+    return analyzer.countRecordsByTable(standByLogDir.toString());
+  }
+
   private void verifyReplication(Map<String, Integer> expected) throws 
Exception {
     // first close the logGroup
     logGroup.close();
@@ -179,24 +186,6 @@ public class ReplicationLogGroupIT extends HABaseIT {
     }
   }
 
-  private void assertMetricsEmitted() {
-    ReplicationLogMetricValues values = 
logGroup.getMetrics().getCurrentMetricValues();
-    assertTrue("appendTime should be > 0, got " + values.getAppendTimeMax(),
-      values.getAppendTimeMax() > 0);
-    assertTrue("syncTime should be > 0, got " + values.getSyncTimeMax(),
-      values.getSyncTimeMax() > 0);
-    assertTrue("ringBufferTime should be > 0, got " + 
values.getRingBufferTimeMax(),
-      values.getRingBufferTimeMax() > 0);
-    assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTimeMax(),
-      values.getFsSyncTimeMax() > 0);
-    assertTrue("batchSize should be > 0, got " + values.getBatchSizeMax(),
-      values.getBatchSizeMax() > 0);
-    assertTrue("pendingSyncCount should be > 0, got " + 
values.getPendingSyncCountMax(),
-      values.getPendingSyncCountMax() > 0);
-    assertTrue("pendingSyncWaitTime should be > 0, got " + 
values.getPendingSyncWaitTimeMax(),
-      values.getPendingSyncWaitTimeMax() > 0);
-  }
-
   private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable) 
{
     LOG.info("Dump table log count for test {}", name.getMethodName());
     for (Map.Entry<String, List<Mutation>> table : 
mutationsByTable.entrySet()) {
@@ -340,7 +329,6 @@ public class ReplicationLogGroupIT extends HABaseIT {
       PreparedStatement stmt =
         conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?)");
       // upsert 50 rows
-      int rowCount = 50;
       for (int i = 0; i < 5; ++i) {
         for (int j = 0; j < 10; ++j) {
           stmt.setInt(1, i);
@@ -351,6 +339,45 @@ public class ReplicationLogGroupIT extends HABaseIT {
         }
         conn.commit();
       }
+
+      // Update existing rows changing only the covered column val2 (val1 
unchanged). With cell
+      // coalescing each phase's index cells share one record, so this 
exercises:
+      // index1 (on val1, includes val2): index row key UNCHANGED -> PRE 
unverified Put and POST
+      // verified Put target the SAME index row, i.e. two writes to the same 
empty-column
+      // qualifier (UNVERIFIED then VERIFIED) split across the PRE and POST 
records.
+      // index2 (on val2): index row key CHANGES (null -> value) -> 
Delete(oldKey)+Put(newKey).
+      PreparedStatement updateVal2 =
+        conn.prepareStatement("upsert into " + tableName + " (id1, id2, val2) 
VALUES(?, ?, ?)");
+      for (int i = 0; i < 5; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          updateVal2.setInt(1, i);
+          updateVal2.setInt(2, j);
+          updateVal2.setString(3, "val2_" + i + "_" + j);
+          updateVal2.executeUpdate();
+        }
+      }
+      conn.commit();
+
+      // Update existing rows changing the indexed column val1 (val2 
unchanged). This flips the
+      // roles relative to the previous pass:
+      // index1 (on val1): index row key CHANGES -> the PRE record makes the 
old index row
+      // unverified (Put) and the new index row unverified (Put), while the 
POST record holds a
+      // verified Put on the new key and a Delete on the old key -- a Put and 
a Delete on
+      // DIFFERENT rows within one coalesced record, which the grouper must 
split on the
+      // row+type boundary.
+      // index2 (on val2): index row key UNCHANGED -> PRE unverified + POST 
verified on same row.
+      PreparedStatement updateVal1 =
+        conn.prepareStatement("upsert into " + tableName + " (id1, id2, val1) 
VALUES(?, ?, ?)");
+      for (int i = 0; i < 5; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          updateVal1.setInt(1, i);
+          updateVal1.setInt(2, j);
+          updateVal1.setString(3, "newval1_" + i + "_" + j);
+          updateVal1.executeUpdate();
+        }
+      }
+      conn.commit();
+
       // do some atomic upserts which will be ignored and therefore not 
replicated
       stmt = conn.prepareStatement(
         "upsert into " + tableName + " VALUES(?, ?, ?) " + "ON DUPLICATE KEY 
IGNORE");
@@ -364,18 +391,15 @@ public class ReplicationLogGroupIT extends HABaseIT {
         }
       }
 
-      // Sanity-check that producer- and consumer-side metrics fired at least 
once on the haGroup.
-      // This guards against the rotationTimeMs-style bug where a metric is 
declared but never
-      // emitted. Snapshot before verifyReplication() since it closes the log 
group.
-      assertMetricsEmitted();
-
-      // verify replication mutation counts
-      // mutation count will be equal to row count since the atomic upsert 
mutations will be
-      // ignored and therefore not replicated
+      // Verify the system tables are never replicated, and flush the log 
group (verifyReplication
+      // closes it) before replay. We deliberately do NOT assert exact 
data/index mutation totals
+      // here: the multi-pass update workload's per-table counts are dominated 
by index-maintenance
+      // internals (local-index key churn, verified/unverified empty-column 
writes) rather than by
+      // the coalescing under test, and coalescing is mutation-count invariant 
by construction. The
+      // authoritative correctness check for this workload is the 
cross-cluster cell-level equality
+      // below; the record-count contract of coalescing is pinned separately in
+      // testAppendAndSyncSingleBatchRecordCount.
       Map<String, Integer> expected = Maps.newHashMap();
-      expected.put(tableName, rowCount * 3); // Put + Delete + local index 
update
-      expected.put(indexName1, rowCount * 3); // unverified + verified + 
delete (DeleteColumn)
-      expected.put(indexName2, rowCount * 2); // unverified + verified
       expected.put(SYSTEM_CATALOG_NAME, 0);
       expected.put(SYSTEM_CHILD_LINK_NAME, 0);
       verifyReplication(expected);
@@ -387,6 +411,59 @@ public class ReplicationLogGroupIT extends HABaseIT {
     }
   }
 
+  /**
+   * Pins the per-batch coalescing contract: one server-side batch on a table 
with one index emits
+   * exactly three log records -- one for the data table, one for the index 
PRE phase, and one for
+   * the index POST phase -- regardless of how many rows the batch contains. 
Before coalescing this
+   * batch would have produced one record per mutation (3 rows x ~3 mutations 
each); coalescing
+   * collapses each (table, phase) into a single cell-stream record. 
Cross-cluster cell equality
+   * confirms the collapsed records still reconstruct the correct mutations on 
the standby.
+   */
+  @Test
+  public void testAppendAndSyncSingleBatchRecordCount() throws Exception {
+    final String tableName = "T_" + generateUniqueName();
+    final String indexName = "I_" + generateUniqueName();
+    String createTableDdl = String.format(
+      "create table if not exists %s (id integer not null primary key, val1 
varchar, val2 varchar)",
+      tableName);
+    String createIndexDdl = String
+      .format("create index if not exists %s on %s (val1) include (val2)", 
indexName, tableName);
+
+    try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) 
DriverManager
+      .getConnection(CLUSTERS.getJdbcHAUrl(), clientProps)) {
+      conn.createStatement().execute(createTableDdl);
+      conn.createStatement().execute(createIndexDdl);
+      conn.commit();
+
+      // Insert several rows and commit them as a SINGLE batch (autocommit 
off, one commit()). All
+      // rows in this batch coalesce into one data-table record plus one PRE 
and one POST record on
+      // the index table.
+      PreparedStatement stmt =
+        conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?)");
+      int rowCount = 5;
+      for (int i = 0; i < rowCount; ++i) {
+        stmt.setInt(1, i);
+        stmt.setString(2, "v1_" + i);
+        stmt.setString(3, "v2_" + i);
+        stmt.executeUpdate();
+      }
+      conn.commit();
+
+      // Flush the log group so the standby files are complete, then count 
records per table.
+      logGroup.close();
+      Map<String, Integer> recordsByTable = countRecordsByTable();
+      LOG.info("Records by table: {}", recordsByTable);
+      assertEquals("Data table should have exactly one coalesced record for 
the batch",
+        Integer.valueOf(1), recordsByTable.get(tableName));
+      assertEquals("Index table should have exactly two records (PRE + POST) 
for the batch",
+        Integer.valueOf(2), recordsByTable.get(indexName));
+
+      // Replay on cluster 2 and verify cross-cluster cell-level equality.
+      replayAndVerifyAcrossClusters(Arrays.asList(createTableDdl, 
createIndexDdl), tableName,
+        indexName);
+    }
+  }
+
   @Test
   public void testAppendAndSyncNoIndex() throws Exception {
     final String tableName = "T_" + generateUniqueName();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index e486f82935..f3edb027f8 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -132,7 +132,7 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
 
     // Add a mutation to make it a proper log file with data
     Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
-    writer.append(tableName, 1, put);
+    writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
     writer.sync();
     writer.close();
 
@@ -213,7 +213,7 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
 
     LogFileWriter writer = initLogFileWriter(filePath);
     Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
-    writer.append(tableName, 1, put);
+    writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
     writer.sync();
     // Do NOT call writer.close() -- skips trailer, simulates a writer crash 
after sync
 
@@ -261,7 +261,7 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
 
     LogFileWriter writer = initLogFileWriter(filePath);
     Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
-    writer.append(tableName, 1, put);
+    writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
     writer.sync();
     // Do NOT call writer.close() -- skips trailer
 
@@ -311,7 +311,7 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
 
     // Add a mutation to make it a proper log file with data
     Mutation put = LogFileTestUtil.newPut("testRow", 1, 1);
-    writer.append(tableName, 1, put);
+    writer.append(tableName, 1, LogFileTestUtil.cellsOf(put));
     writer.sync();
     writer.close();
 
@@ -528,14 +528,14 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
         generateHBaseMutations(phoenixConnection, 5, table2Name, 101L, "b");
       table1Mutations.forEach(mutation -> {
         try {
-          writer.append(table1Name, mutation.hashCode(), mutation);
+          writer.append(table1Name, mutation.hashCode(), 
LogFileTestUtil.cellsOf(mutation));
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
       });
       table2Mutations.forEach(mutation -> {
         try {
-          writer.append(table2Name, mutation.hashCode(), mutation);
+          writer.append(table2Name, mutation.hashCode(), 
LogFileTestUtil.cellsOf(mutation));
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
@@ -646,7 +646,7 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
 
     // Add one mutation
     Mutation put = LogFileTestUtil.newPut("row1", 3L, 4);
-    writer.append(tableNameString, 1, put);
+    writer.append(tableNameString, 1, LogFileTestUtil.cellsOf(put));
     writer.sync();
 
     // For processing of an unclosed file to work, we need to disable trailer 
validation
@@ -733,7 +733,7 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
       Put put = new Put(Bytes.toBytes("row" + i));
       put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"), 
Bytes.toBytes("abcd"));
       mutations.add(put);
-      writer.append(tableName, i, put);
+      writer.append(tableName, i, LogFileTestUtil.cellsOf(put));
     }
 
     // Add 1 big mutation that will cross the byte size threshold before count 
threshold
@@ -749,14 +749,14 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
         + "it crosses the byte size threshold and forces a batch to be 
processed."));
 
     mutations.add(bigPut);
-    writer.append(tableName, 100, bigPut);
+    writer.append(tableName, 100, LogFileTestUtil.cellsOf(bigPut));
 
     // Add more small mutations that will be batched due to count limit
     for (int i = 3; i < 10; i++) {
       Put put = new Put(Bytes.toBytes("row" + i));
       put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qual"), 
Bytes.toBytes("abcd"));
       mutations.add(put);
-      writer.append(tableName, i, put);
+      writer.append(tableName, i, LogFileTestUtil.cellsOf(put));
     }
 
     writer.close();
@@ -924,13 +924,13 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
       // Add mutation for table1
       Mutation put1 = LogFileTestUtil.newPut("row1_" + i, (i * 2) + 1, (i * 2) 
+ 1);
       table1Mutations.add(put1);
-      writer.append(table1Name, (i * 2) + 1, put1);
+      writer.append(table1Name, (i * 2) + 1, LogFileTestUtil.cellsOf(put1));
       writer.sync();
 
       // Add mutation for table2
       Mutation put2 = LogFileTestUtil.newPut("row2_" + i, (i * 2) + 2, (i * 2) 
+ 2);
       table2Mutations.add(put2);
-      writer.append(table2Name, (i * 2) + 2, put2);
+      writer.append(table2Name, (i * 2) + 2, LogFileTestUtil.cellsOf(put2));
       writer.sync();
     }
     writer.close();
@@ -1663,7 +1663,7 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
     for (int i = 0; i < totalRecords; i++) {
       Mutation put = LogFileTestUtil.newPut("row" + i, i + 1, i + 1);
       originalMutations.add(put);
-      writer.append(tableName, i + 1, put);
+      writer.append(tableName, i + 1, LogFileTestUtil.cellsOf(put));
       writer.sync();
     }
     writer.close();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/MutationCellGrouperTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/MutationCellGrouperTest.java
new file mode 100644
index 0000000000..684fdebcd7
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/MutationCellGrouperTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link MutationCellGrouper#splitCellsIntoMutations}, the 
replay-side inverse of
+ * per-batch cell coalescing. This algorithm is the correctness lynchpin of 
PHOENIX-7931: a
+ * regression here would silently merge or split mutations on the standby with 
no exception, so the
+ * row+type boundary behavior is pinned here at the unit level rather than 
only through the
+ * heavyweight cross-cluster IT.
+ */
+public class MutationCellGrouperTest {
+
+  private static final byte[] FAMILY = Bytes.toBytes("cf");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+  private static final long TS = 100L;
+
+  private static Cell putCell(String row, String value) {
+    return 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes(row))
+      
.setFamily(FAMILY).setQualifier(QUALIFIER).setTimestamp(TS).setType(Cell.Type.Put)
+      .setValue(Bytes.toBytes(value)).build();
+  }
+
+  private static Cell deleteColumnCell(String row) {
+    return 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes(row))
+      
.setFamily(FAMILY).setQualifier(QUALIFIER).setTimestamp(TS).setType(Cell.Type.DeleteColumn)
+      .build();
+  }
+
+  private static Cell deleteFamilyCell(String row) {
+    return 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes(row))
+      
.setFamily(FAMILY).setQualifier(QUALIFIER).setTimestamp(TS).setType(Cell.Type.DeleteFamily)
+      .build();
+  }
+
+  private static String rowOf(Mutation m) {
+    return Bytes.toString(m.getRow());
+  }
+
+  @Test
+  public void testEmptyInputYieldsEmptyList() throws Exception {
+    List<Mutation> result =
+      MutationCellGrouper.splitCellsIntoMutations(Collections.<Cell> 
emptyList());
+    assertTrue("Empty input should produce no mutations", result.isEmpty());
+  }
+
+  @Test
+  public void testSinglePutCell() throws Exception {
+    Cell cell = putCell("row1", "v1");
+    List<Mutation> result =
+      
MutationCellGrouper.splitCellsIntoMutations(Collections.singletonList(cell));
+    assertEquals(1, result.size());
+    assertTrue("Expected a Put", result.get(0) instanceof Put);
+    assertEquals("row1", rowOf(result.get(0)));
+    assertEquals(1, result.get(0).size());
+  }
+
+  @Test
+  public void testSingleDeleteCell() throws Exception {
+    Cell cell = deleteColumnCell("row1");
+    List<Mutation> result =
+      
MutationCellGrouper.splitCellsIntoMutations(Collections.singletonList(cell));
+    assertEquals(1, result.size());
+    assertTrue("Expected a Delete", result.get(0) instanceof Delete);
+    assertEquals("row1", rowOf(result.get(0)));
+  }
+
+  @Test
+  public void testContiguousCellsSameRowAndTypeFormOneMutation() throws 
Exception {
+    List<Cell> cells = new ArrayList<>();
+    cells.add(putCell("row1", "v1"));
+    cells.add(putCell("row1", "v2"));
+    cells.add(putCell("row1", "v3"));
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    assertEquals("Same row + same type cells must coalesce into one Put", 1, 
result.size());
+    assertTrue(result.get(0) instanceof Put);
+    assertEquals(3, result.get(0).size());
+  }
+
+  @Test
+  public void testRowChangeStartsNewMutation() throws Exception {
+    List<Cell> cells = new ArrayList<>();
+    cells.add(putCell("row1", "v1"));
+    cells.add(putCell("row2", "v2"));
+    cells.add(putCell("row3", "v3"));
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    assertEquals("Each distinct row should yield its own Put", 3, 
result.size());
+    assertEquals("row1", rowOf(result.get(0)));
+    assertEquals("row2", rowOf(result.get(1)));
+    assertEquals("row3", rowOf(result.get(2)));
+  }
+
+  /**
+   * The case that justifies the class: a single row whose Put cells precede 
its Delete cells (e.g.
+   * an index row that is rewritten then partially deleted within one 
server-side batch) must split
+   * on the put-vs-delete boundary into a separate Put and Delete, never merge 
into one mutation.
+   */
+  @Test
+  public void testPutThenDeleteSameRowSplitsOnTypeBoundary() throws Exception {
+    List<Cell> cells = new ArrayList<>();
+    cells.add(putCell("row1", "v1"));
+    cells.add(deleteColumnCell("row1"));
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    assertEquals("Put and Delete on the same row must be two mutations", 2, 
result.size());
+    assertTrue("First should be the Put", result.get(0) instanceof Put);
+    assertTrue("Second should be the Delete", result.get(1) instanceof Delete);
+    assertEquals("row1", rowOf(result.get(0)));
+    assertEquals("row1", rowOf(result.get(1)));
+  }
+
+  @Test
+  public void testPutDeletePutOnThreeRowsYieldsThreeMutations() throws 
Exception {
+    List<Cell> cells = new ArrayList<>();
+    cells.add(putCell("rowA", "v1"));
+    cells.add(deleteColumnCell("rowB"));
+    cells.add(putCell("rowC", "v3"));
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    assertEquals(3, result.size());
+    assertTrue(result.get(0) instanceof Put);
+    assertTrue(result.get(1) instanceof Delete);
+    assertTrue(result.get(2) instanceof Put);
+    assertEquals("rowA", rowOf(result.get(0)));
+    assertEquals("rowB", rowOf(result.get(1)));
+    assertEquals("rowC", rowOf(result.get(2)));
+  }
+
+  /**
+   * Documents that the boundary is keyed on the exact cell type, not merely 
put-vs-delete: adjacent
+   * DeleteColumn and DeleteFamily cells on the same row split into two 
separate Delete mutations.
+   * This mirrors HBase's ReplicationSink and is relied on so a future change 
does not silently
+   * coalesce distinct delete subtypes.
+   */
+  @Test
+  public void testAdjacentDeleteSubtypesSameRowSplit() throws Exception {
+    List<Cell> cells = new ArrayList<>();
+    cells.add(deleteColumnCell("row1"));
+    cells.add(deleteFamilyCell("row1"));
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    assertEquals("Distinct delete subtypes must split into separate Deletes", 
2, result.size());
+    assertTrue(result.get(0) instanceof Delete);
+    assertTrue(result.get(1) instanceof Delete);
+  }
+
+  /**
+   * The grouper keys boundaries off the immediately preceding cell only, so a 
row that recurs
+   * non-contiguously produces a separate mutation per contiguous run. This 
encodes the documented
+   * "global row ordering is not required" contract: rowA appearing twice with 
rowB between yields
+   * two rowA mutations, not a merged one.
+   */
+  @Test
+  public void testNonContiguousSameRowYieldsSeparateMutations() throws 
Exception {
+    List<Cell> cells = new ArrayList<>();
+    cells.add(putCell("rowA", "v1"));
+    cells.add(putCell("rowB", "v2"));
+    cells.add(putCell("rowA", "v3"));
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    assertEquals(3, result.size());
+    assertEquals("rowA", rowOf(result.get(0)));
+    assertEquals("rowB", rowOf(result.get(1)));
+    assertEquals("rowA", rowOf(result.get(2)));
+  }
+
+  @Test
+  public void testCellsArePreservedInResultMutations() throws Exception {
+    Cell put1 = putCell("row1", "v1");
+    Cell put2 = putCell("row1", "v2");
+    List<Cell> cells = new ArrayList<>();
+    cells.add(put1);
+    cells.add(put2);
+    List<Mutation> result = MutationCellGrouper.splitCellsIntoMutations(cells);
+    assertEquals(1, result.size());
+    List<Cell> grouped = result.get(0).getFamilyCellMap().get(FAMILY);
+    assertEquals(2, grouped.size());
+    assertTrue(
+      CellUtil.equals(put1, grouped.get(0)) && CellUtil.matchingValue(put1, 
grouped.get(0)));
+    assertTrue(
+      CellUtil.equals(put2, grouped.get(1)) && CellUtil.matchingValue(put2, 
grouped.get(1)));
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index dcd4cd55d3..fa69ac78f6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
@@ -109,11 +110,16 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Happens-before ordering verification, using Mockito's inOrder. Verify 
that the appends
     // happen before sync, and sync happened after appends.
-    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId1), 
eq(put1));
-    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId2), 
eq(put2));
-    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId3), 
eq(put3));
-    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId4), 
eq(put4));
-    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId5), 
eq(put5));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId1),
+      eq(LogFileTestUtil.cellsOf(put1)));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId2),
+      eq(LogFileTestUtil.cellsOf(put2)));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId3),
+      eq(LogFileTestUtil.cellsOf(put3)));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId4),
+      eq(LogFileTestUtil.cellsOf(put4)));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId5),
+      eq(LogFileTestUtil.cellsOf(put5)));
     inOrder.verify(writer, times(1)).sync();
   }
 
@@ -144,7 +150,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Verify the sequence: append, sync (fail), sync (succeed on retry with 
same writer)
     InOrder inOrder = Mockito.inOrder(writer);
-    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(writer, times(2)).sync(); // First fails, second succeeds
   }
 
@@ -169,7 +176,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
         sleep(50); // Simulate slow processing
         return invocation.callRealMethod();
       }
-    }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+    }).when(innerWriter).append(anyString(), anyLong(), any(List.class));
 
     // Fill up the ring buffer by sending enough events.
     for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
@@ -206,7 +213,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertTrue("Append should have completed", appendFuture.isDone());
 
     // Verify the append eventually happens on the writer.
-    verify(innerWriter, timeout(10000).times(1)).append(eq(tableName), 
eq(myCommitId), any());
+    verify(innerWriter, timeout(10000).times(1)).append(eq(tableName), 
eq(myCommitId),
+      any(List.class));
   }
 
   /**
@@ -225,7 +233,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Configure writerBeforeRoll to fail on the first append call
     doThrow(new IOException("Simulated append 
failure")).when(writerBeforeRoll).append(anyString(),
-      anyLong(), any(Mutation.class));
+      anyLong(), any(List.class));
 
     // Append data
     logGroup.append(tableName, commitId, put);
@@ -237,9 +245,11 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Verify the sequence: append (fail), rotate, append (succeed), sync
     InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
-    inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
+    inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append, 
did not try
-    inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // Retry
+    inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName), 
eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put))); // Retry
     inOrder.verify(writerAfterRoll, times(1)).sync();
   }
 
@@ -348,7 +358,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // Verify that all of appends were processed by the internal writer.
     for (int i = 0; i < APPENDS_PER_THREAD * 2; i++) {
       final long commitId = i;
-      verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), any());
+      verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), 
any(List.class));
     }
   }
 
@@ -389,9 +399,11 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Verify the sequence of operations
     InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
-    inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
+    inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(writerBeforeRotation, times(1)).sync();
-    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 1), eq(put));
+    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 1),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(writerAfterRotation, times(1)).sync();
   }
 
@@ -430,7 +442,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
     // Verify the final append went to the new writer
-    verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(writerAfterRotation, times(1)).sync();
   }
 
@@ -510,10 +523,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
     // Verify first batch went to initial writer
-    verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L), 
eq(put));
+    verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(writerBeforeRotation, times(1)).sync();
     // Verify second batch went to new writer (swap happened before append)
-    verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId + 
1), eq(put));
+    verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId + 
1),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(writerAfterRotation, times(1)).sync();
     // Verify the initial writer was closed asynchronously
     verify(writerBeforeRotation, timeout(5000).times(1)).close();
@@ -568,11 +583,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Verify operations went to the writers in the correct order
     InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
-    inOrder.verify(initialWriter).append(eq(tableName), eq(commitId), eq(put));
+    inOrder.verify(initialWriter).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(initialWriter).sync();
-    inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1), 
eq(put));
+    inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(initialWriter).sync();
-    inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2), 
eq(put));
+    inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(writerAfterRotate).sync();
   }
 
@@ -628,7 +646,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Configure writer to throw a RuntimeException on append
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
-      anyLong(), any(Mutation.class));
+      anyLong(), any(List.class));
 
     // Append publishes to the ring buffer. The event handler catches the 
RuntimeException via
     // catch(Throwable), poisons itself, and fails the sync future. The 
producer receives
@@ -766,12 +784,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // 5 appends went to old writer (processed before rotation task fired)
     for (int i = 0; i < 5; i++) {
       inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
-        eq(put));
+        eq(LogFileTestUtil.cellsOf(put)));
     }
     // Swap happens before sync action: 5 records replayed into new writer
     for (int i = 0; i < 5; i++) {
       inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
-        eq(put));
+        eq(LogFileTestUtil.cellsOf(put)));
     }
     // Sync goes to new writer
     inOrder.verify(writerAfterRotation, times(1)).sync();
@@ -849,10 +867,6 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Write records across multiple rotations.
     for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
-      // Get the path of the current log file.
-      Path logPath = activeLog.getWriter().getContext().getFilePath();
-      logPaths.add(logPath);
-
       for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
         int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
         LogFile.Record record =
@@ -861,6 +875,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
         logGroup.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
       }
       logGroup.sync(); // Sync to commit the appends to the current writer.
+      // Capture the writer path AFTER sync, once this batch's apply() has 
settled currentWriter to
+      // the file these records landed on. Capturing before the appends races 
the async swap event:
+      // checkAndReplaceWriter's pendingWriter.getAndSet(null) and the 
currentWriter assignment are
+      // not atomic, so getWriter() can see the pending writer already taken 
by the consumer while
+      // currentWriter still points at the previous file, mis-recording the 
path.
+      logPaths.add(activeLog.getWriter().getContext().getFilePath());
       // Force a rotation to close the current writer.
       activeLog.forceRotation();
     }
@@ -919,9 +939,6 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     ReplicationLog activeLog = logGroup.getActiveLog();
 
     for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
-      Path logPath = activeLog.getWriter().getContext().getFilePath();
-      logPaths.add(logPath);
-
       for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
         int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
         LogFile.Record record =
@@ -931,6 +948,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       }
 
       logGroup.sync();
+      // Capture the writer path AFTER sync, once this batch's apply() has 
settled currentWriter to
+      // the file these records landed on. Capturing before the appends races 
the async swap event:
+      // checkAndReplaceWriter's pendingWriter.getAndSet(null) and the 
currentWriter assignment are
+      // not atomic, so getWriter() can see the pending writer already taken 
by the consumer while
+      // currentWriter still points at the previous file, mis-recording the 
path.
+      logPaths.add(activeLog.getWriter().getContext().getFilePath());
       activeLog.forceRotation();
     }
 
@@ -1020,7 +1043,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Configure writer to throw RuntimeException on append
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
-      anyLong(), any(Mutation.class));
+      anyLong(), any(List.class));
 
     // Append publishes to the ring buffer. The event handler catches the 
RuntimeException,
     // poisons itself, and fails the sync future. The producer calls abort().
@@ -1059,7 +1082,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Configure writer to throw RuntimeException on append
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
-      anyLong(), any(Mutation.class));
+      anyLong(), any(List.class));
 
     // Append publishes to the ring buffer. The event handler catches the 
RuntimeException,
     // poisons itself, and fails the sync future. The producer calls abort().
@@ -1166,7 +1189,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
         sleep(50); // Delay to allow multiple events to be posted
         return invocation.callRealMethod();
       }
-    }).when(innerWriter).append(eq(tableName), eq(commitId1), eq(put1));
+    }).when(innerWriter).append(eq(tableName), eq(commitId1), 
eq(LogFileTestUtil.cellsOf(put1)));
 
     // Post appends and three syncs in quick succession. The first append will 
be delayed long
     // enough for the three syncs to appear in a single Disruptor batch. Then 
they should all
@@ -1181,9 +1204,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // Verify the sequence of operations on the inner writer: the three 
appends, then exactly
     // one sync.
     InOrder inOrder = Mockito.inOrder(innerWriter);
-    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId1), 
eq(put1));
-    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId2), 
eq(put2));
-    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId3), 
eq(put3));
+    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId1),
+      eq(LogFileTestUtil.cellsOf(put1)));
+    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId2),
+      eq(LogFileTestUtil.cellsOf(put2)));
+    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId3),
+      eq(LogFileTestUtil.cellsOf(put3)));
     inOrder.verify(innerWriter, times(1)).sync(); // Only one sync should be 
called
   }
 
@@ -1287,12 +1313,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // invocation/stub state is not thread-safe and the partially-applied stub 
can be matched
     // against an unrelated method on the consumer thread.
     doThrow(new IOException("Simulate append 
failure")).when(writer).append(tableName, commitId5,
-      put5);
+      LogFileTestUtil.cellsOf(put5));
     // Rotated writers must also fail on the 5th append so the retry doesn't 
rescue the loop.
     doAnswer(invocation -> {
       LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
       doThrow(new IOException("Simulate append 
failure")).when(w).append(tableName, commitId5,
-        put5);
+        LogFileTestUtil.cellsOf(put5));
       return w;
     }).when(activeLog).createNewWriter();
 
@@ -1309,11 +1335,16 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // verify that all the in-flight appends and syncs are replayed on the new 
store and forward
     // writer
-    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId1), eq(put1));
-    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId2), eq(put2));
-    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId3), eq(put3));
-    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId4), eq(put4));
-    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId5), eq(put5));
+    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId1),
+      eq(LogFileTestUtil.cellsOf(put1)));
+    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId2),
+      eq(LogFileTestUtil.cellsOf(put2)));
+    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId3),
+      eq(LogFileTestUtil.cellsOf(put3)));
+    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId4),
+      eq(LogFileTestUtil.cellsOf(put4)));
+    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId5),
+      eq(LogFileTestUtil.cellsOf(put5)));
     inOrder.verify(storeAndForwardWriter, times(1)).sync();
   }
 
@@ -1379,15 +1410,16 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // 3 appends went to old writer
     for (int i = 0; i < 3; i++) {
       inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
-        eq(put));
+        eq(LogFileTestUtil.cellsOf(put)));
     }
     // 3 records replayed into new writer
     for (int i = 0; i < 3; i++) {
       inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
-        eq(put));
+        eq(LogFileTestUtil.cellsOf(put)));
     }
     // 4th append goes to new writer
-    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 3), eq(put));
+    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 3),
+      eq(LogFileTestUtil.cellsOf(put)));
     inOrder.verify(writerAfterRotation, times(1)).sync();
 
     // Old writer closed async
@@ -1429,10 +1461,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertTrue("Should be using new writer", newWriter != initialWriter);
 
     // Old writer: received the append only
-    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
 
     // New writer: received replayed append + successful sync
-    verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(newWriter, times(1)).sync();
   }
 
@@ -1441,6 +1475,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
    * idle. A rotation tick stages a pending writer. The reader performs HDFS 
lease recovery,
    * breaking the old writer's stream. When events resume, apply() drains the 
healthy staged writer
    * before the action — the broken writer is never touched. No replay needed 
(empty batch).
+   * <p>
+   * Determinism: the broken-stream stubs are installed while the system is 
idle, before
+   * forceRotation() publishes the swap event, so the consumer thread never 
races stub installation
+   * on the initialWriter spy. Before verifying the initialWriter invocation 
counts we await its
+   * async close (submitClose on the swap), which happens-after every 
invocation the swap can make
+   * on that spy — so the count assertions cannot race the 
consumer/close-executor threads.
    */
   @Test
   public void testIdleLeaseRecoveryDrainsStagedWriter() throws Exception {
@@ -1456,15 +1496,16 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
-    // Stage W2 in pendingWriter via forced rotation
-    activeLog.forceRotation();
-
-    // Simulate HDFS lease recovery breaking the old writer's stream
+    // Simulate HDFS lease recovery breaking the old writer's stream. 
Installed while idle — before
+    // forceRotation() publishes the swap event — so the consumer cannot race 
this stub install.
     doThrow(new IOException("Simulated broken stream after lease 
recovery")).when(initialWriter)
-      .append(anyString(), anyLong(), any(Mutation.class));
+      .append(anyString(), anyLong(), any(List.class));
     doThrow(new IOException("Simulated broken stream after lease 
recovery")).when(initialWriter)
       .sync();
 
+    // Stage W2 in pendingWriter via forced rotation
+    activeLog.forceRotation();
+
     // Events resume — apply() drains W2 before the action, so broken writer 
is never touched
     logGroup.append(tableName, commitId + 1, put);
     logGroup.sync();
@@ -1474,11 +1515,17 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       newWriter != initialWriter);
 
     // New writer received the new append + sync (no replay — currentBatch was 
empty)
-    verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1), 
eq(put));
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(newWriter, times(1)).sync();
 
+    // Await the async close of the old writer. This happens-after every 
invocation the swap makes
+    // on the initialWriter spy, so the count verifications below cannot race 
the consumer thread.
+    verify(initialWriter, timeout(5000).times(1)).close();
+
     // Old writer: only the pre-idle append + sync, nothing after the break
-    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(initialWriter, times(1)).sync();
   }
 
@@ -1505,7 +1552,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
           throw new IOException("Simulated transient HDFS error during 
replay");
         }
         return appendInvocation.callRealMethod();
-      }).when(w).append(anyString(), anyLong(), any(Mutation.class));
+      }).when(w).append(anyString(), anyLong(), any(List.class));
       return w;
     }).when(activeLog).createNewWriter();
 
@@ -1526,9 +1573,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertTrue("Should be using a fresh writer", finalWriter != initialWriter);
 
     // Final writer (W3): replayed r1+r2 then appended r3 — each exactly once.
-    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
-    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 1), 
eq(put));
-    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 2), 
eq(put));
+    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
+    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 1),
+      eq(LogFileTestUtil.cellsOf(put)));
+    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 2),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(finalWriter, times(1)).sync();
   }
 
@@ -1549,7 +1599,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Configure initial writer's append to always fail (simulating broken 
HDFS stream)
     doThrow(new IOException("Simulated broken 
stream")).when(initialWriter).append(anyString(),
-      anyLong(), any(Mutation.class));
+      anyLong(), any(List.class));
 
     // Append — attempt 1 fails on initialWriter, rotation requested, attempt 
2 drains the
     // rotated writer and succeeds
@@ -1560,9 +1610,11 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertTrue("Should be using a new writer after error recovery", newWriter 
!= initialWriter);
 
     // Old writer received 1 failed attempt
-    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     // New writer received the successful append
-    verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId),
+      eq(LogFileTestUtil.cellsOf(put)));
     verify(newWriter, times(1)).sync();
   }
 
@@ -2061,7 +2113,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     doAnswer(invocation -> {
       holdConsumer.await();
       return invocation.callRealMethod();
-    }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+    }).when(innerWriter).append(anyString(), anyLong(), any(List.class));
 
     Thread filler = new Thread(() -> {
       try {
@@ -2109,6 +2161,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     final int appendsPerSync = Integer.getInteger("test.appendsPerSync", 5);
     final int cellsPerMutation = Integer.getInteger("test.cellsPerMutation", 
1);
     final long innerSyncDelayMs = Long.getLong("test.innerSyncDelayMs", 2);
+    // Framing of the appendsPerSync mutations between two syncs:
+    // permutation - one append() per mutation (pre-coalescing behavior)
+    // perbatch - one append(table, commitId, List<Cell>) per sync carrying 
the whole batch's
+    // flat cell stream (the coalesced behavior IndexRegionObserver now emits)
+    final String recordFraming = System.getProperty("test.recordFraming", 
"permutation");
+    final boolean perBatch = "perbatch".equals(recordFraming);
 
     // Use the production-default ring buffer size so producers are not 
artificially blocked on
     // ringBuffer.next() — the default test fixture uses a 32-slot buffer 
which fills under
@@ -2164,11 +2222,26 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
           try {
             startLatch.await();
             for (int i = 0; i < syncsPerProducer; i++) {
-              for (int j = 0; j < appendsPerSync; j++) {
+              if (perBatch) {
+                // Coalesce the whole batch into a single record carrying the 
flat cell stream,
+                // mirroring IndexRegionObserver's per-(table,batch) append.
+                List<Cell> batchCells = new ArrayList<>(appendsPerSync * 
cellsPerMutation);
                 long commitId = commitIdSeq.getAndIncrement();
-                Mutation put = LogFileTestUtil.newPut("row" + commitId, 
commitId, cellsPerMutation);
-                logGroup.append(tableName, commitId, put);
+                for (int j = 0; j < appendsPerSync; j++) {
+                  Mutation put =
+                    LogFileTestUtil.newPut("row" + commitId + "_" + j, 
commitId, cellsPerMutation);
+                  batchCells.addAll(LogFileTestUtil.cellsOf(put));
+                }
+                logGroup.append(tableName, commitId, batchCells);
                 totalProducerAppends.incrementAndGet();
+              } else {
+                for (int j = 0; j < appendsPerSync; j++) {
+                  long commitId = commitIdSeq.getAndIncrement();
+                  Mutation put =
+                    LogFileTestUtil.newPut("row" + commitId, commitId, 
cellsPerMutation);
+                  logGroup.append(tableName, commitId, put);
+                  totalProducerAppends.incrementAndGet();
+                }
               }
               logGroup.sync();
               totalProducerSyncs.incrementAndGet();
@@ -2236,4 +2309,70 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       pool.awaitTermination(5, TimeUnit.SECONDS);
     }
   }
+
+  /**
+   * Verifies that the producer- and consumer-side sync metrics are actually 
emitted on the write
+   * path. Guards against the "metric declared but never wired" regression 
class.
+   * <p>
+   * Two deliberate choices make this assertion deterministic where the IT 
version flaked:
+   * <ul>
+   * <li>The inner writer's {@code sync()} is stubbed to sleep a couple of ms 
so that
+   * {@code syncTime} and {@code fsSyncTime} — which are stored ms-truncated — 
clear the
+   * sub-millisecond floor every run. On a fast disk a real fsync floors to 0 
ms, which is correct
+   * behavior but breaks a naive {@code > 0} assertion.</li>
+   * <li>Metrics are read only after a graceful {@link 
ReplicationLogGroup#close()}, which joins the
+   * Disruptor consumer thread. This establishes happens-before on every 
consumer-recorded metric,
+   * including {@code batchSize}, which is updated on the endOfBatch callback 
after the producer's
+   * sync future has already completed.</li>
+   * </ul>
+   */
+  @Test
+  public void testSyncMetricsEmitted() throws Exception {
+    // Make the inner fsync take >= 1ms so the ms-truncated 
syncTime/fsSyncTime histograms record a
+    // non-zero value deterministically.
+    final long innerSyncDelayMs = 2;
+    LogFileWriter writer = logGroup.getActiveLog().getWriter();
+    assertNotNull("Writer should not be null", writer);
+    doAnswer(invocation -> {
+      sleep(innerSyncDelayMs);
+      return invocation.callRealMethod();
+    }).when(writer).sync();
+
+    final String tableName = "TESTTBL";
+    final int batches = 5;
+    final int appendsPerBatch = 4;
+    long commitId = 0;
+    for (int b = 0; b < batches; b++) {
+      for (int j = 0; j < appendsPerBatch; j++) {
+        logGroup.append(tableName, commitId, LogFileTestUtil.newPut("row" + 
commitId, commitId, 1));
+        commitId++;
+      }
+      logGroup.sync();
+    }
+
+    // Close gracefully (fatalException == null) so the consumer drains and 
the executor is joined
+    // before we read. close() is idempotent, so tearDown's close() is a no-op.
+    logGroup.close();
+
+    // getCurrentMetricValues() snapshots and resets the histogram bins, so 
call it exactly once.
+    ReplicationLogMetricValues values = 
logGroup.getMetrics().getCurrentMetricValues();
+
+    // Nanosecond-resolution metrics never truncate to zero, so assert 
strictly positive.
+    assertTrue("appendTime should be > 0, got " + values.getAppendTimeMax(),
+      values.getAppendTimeMax() > 0);
+    assertTrue("ringBufferTime should be > 0, got " + 
values.getRingBufferTimeMax(),
+      values.getRingBufferTimeMax() > 0);
+    assertTrue("pendingSyncWaitTime should be > 0, got " + 
values.getPendingSyncWaitTimeMax(),
+      values.getPendingSyncWaitTimeMax() > 0);
+    // Counts.
+    assertTrue("batchSize should be > 0, got " + values.getBatchSizeMax(),
+      values.getBatchSizeMax() > 0);
+    assertTrue("pendingSyncCount should be > 0, got " + 
values.getPendingSyncCountMax(),
+      values.getPendingSyncCountMax() > 0);
+    // Millisecond-resolution timers: the injected fsync delay clears the 
truncation floor.
+    assertTrue("syncTime should be > 0, got " + values.getSyncTimeMax(),
+      values.getSyncTimeMax() > 0);
+    assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTimeMax(),
+      values.getFsSyncTimeMax() > 0);
+  }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
index a3955c5783..debe855a4d 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCodecTest.java
@@ -34,10 +34,12 @@ import java.util.Random;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assume;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,8 +109,7 @@ public class LogFileCodecTest {
     List<LogFile.Record> originals =
       Arrays.asList(LogFileTestUtil.newPutRecord("TBL1", 100L, "row1", 12345L, 
1),
         LogFileTestUtil.newPutRecord("TBL2", 101L, "row2", 12346L, 2),
-        LogFileTestUtil.newPutRecord("TBL1", 102L, "row3", 12347L, 0) // No 
columns
-      );
+        LogFileTestUtil.newPutRecord("TBL1", 102L, "row3", 12347L, 3));
 
     // Encode
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -261,21 +262,23 @@ public class LogFileCodecTest {
     singleRecordTest(LogFileTestUtil.newPutRecord("TBLMANYVALS", 1L, "row", 
12345L, 100));
   }
 
-  @Test
-  public void testCodecWithEmptyPut() throws IOException {
+  @Test(expected = IllegalArgumentException.class)
+  public void testCodecRejectsEmptyPut() throws IOException {
     long ts = 12345L;
     Put put = new Put(Bytes.toBytes("row"));
     put.setTimestamp(ts);
-    singleRecordTest(
-      new 
LogFileRecord().setHBaseTableName("TBLEMPTYPUT").setCommitId(1L).setMutation(put));
+    LogFileCodec codec = new LogFileCodec();
+    codec.getEncoder(new DataOutputStream(new ByteArrayOutputStream()))
+      .write(new 
LogFileRecord().setHBaseTableName("TBLEMPTYPUT").setCommitId(1L).setMutation(put));
   }
 
-  @Test
-  public void testCodecWithEmptyDelete() throws IOException {
+  @Test(expected = IllegalArgumentException.class)
+  public void testCodecRejectsEmptyDelete() throws IOException {
     long ts = 12345L;
     Delete delete = new Delete(Bytes.toBytes("row"));
     delete.setTimestamp(ts);
-    singleRecordTest(
+    LogFileCodec codec = new LogFileCodec();
+    codec.getEncoder(new DataOutputStream(new ByteArrayOutputStream())).write(
       new 
LogFileRecord().setHBaseTableName("TBLEMPTYDEL").setCommitId(1L).setMutation(delete));
   }
 
@@ -454,4 +457,177 @@ public class LogFileCodecTest {
       new 
LogFileRecord().setHBaseTableName("TBLCTDFV").setCommitId(1L).setMutation(del4));
   }
 
+  @Test
+  public void testMultipleFamiliesRoundTrip() throws IOException {
+    long ts = 12345L;
+    byte[] row = Bytes.toBytes("row");
+    // Add families in non-sorted insertion order to verify the codec 
preserves the iteration
+    // order of mutation.getFamilyCellMap() (which is a TreeMap, so families 
come out sorted).
+    Put put = new Put(row);
+    put.setTimestamp(ts);
+    put.addColumn(Bytes.toBytes("z_cf"), Bytes.toBytes("q1"), ts, 
Bytes.toBytes("vz"));
+    put.addColumn(Bytes.toBytes("a_cf"), Bytes.toBytes("q1"), ts, 
Bytes.toBytes("va"));
+    put.addColumn(Bytes.toBytes("m_cf"), Bytes.toBytes("q1"), ts, 
Bytes.toBytes("vm"));
+    put.addColumn(Bytes.toBytes("a_cf"), Bytes.toBytes("q2"), ts, 
Bytes.toBytes("va2"));
+
+    LogFile.Record original =
+      new 
LogFileRecord().setHBaseTableName("TBLMULTIFAM").setCommitId(1L).setMutation(put);
+    singleRecordTest(original);
+
+    // Verify the cells are ordered family-then-qualifier (TreeMap ordering of 
getFamilyCellMap)
+    List<Cell> cells = original.getCells();
+    assertEquals(4, cells.size());
+    assertTrue("first family should be a_cf",
+      Bytes.equals(CellUtil.cloneFamily(cells.get(0)), Bytes.toBytes("a_cf")));
+    assertTrue("second family should be a_cf",
+      Bytes.equals(CellUtil.cloneFamily(cells.get(1)), Bytes.toBytes("a_cf")));
+    assertTrue("third family should be m_cf",
+      Bytes.equals(CellUtil.cloneFamily(cells.get(2)), Bytes.toBytes("m_cf")));
+    assertTrue("fourth family should be z_cf",
+      Bytes.equals(CellUtil.cloneFamily(cells.get(3)), Bytes.toBytes("z_cf")));
+  }
+
+  /**
+   * Framing A/B microbenchmark. Isolates the codec-level cost of record 
framing. The per-cell wire
+   * format is identical in both modes (same flat-cell encoding); the only 
thing that differs is how
+   * often the per-record header (record length + table name + commitId + 
attribute count) is paid:
+   * <ul>
+   * <li>Mode A (per-mutation): one {@link LogFileRecord} per mutation, 
encoded separately, so the
+   * header is paid {@code mutationCount} times.</li>
+   * <li>Mode B (per-batch): one record carrying the whole batch's flat cell 
stream, encoded once,
+   * so the header is paid once.</li>
+   * </ul>
+   * Both modes build their cells identically and share one codec instance, so 
any delta in
+   * {@code wireBytes} is attributable to framing overhead and any delta in 
{@code appendNs} to the
+   * extra per-record encode calls. Opt in with {@code 
-Dtest.runFramingBenchmark=true}; tune with
+   * {@code -Dtest.mutationCount}, {@code -Dtest.cellsPerMutation}, {@code 
-Dtest.valueSize},
+   * {@code -Dtest.benchmarkIterations}.
+   */
+  @Test
+  public void testFramingMicrobenchmark() throws IOException {
+    Assume.assumeTrue("Framing microbenchmark, opt in with 
-Dtest.runFramingBenchmark=true",
+      Boolean.getBoolean("test.runFramingBenchmark"));
+    final String tableName = "TBLFRAME";
+    final int mutationCount = Integer.getInteger("test.mutationCount", 100);
+    final int cellsPerMutation = Integer.getInteger("test.cellsPerMutation", 
4);
+    final int valueSize = Integer.getInteger("test.valueSize", 64);
+    final int iterations = Integer.getInteger("test.benchmarkIterations", 
2000);
+    final int warmup = Math.max(1, iterations / 10);
+
+    // Build the batch's cells once. Each mutation contributes 
cellsPerMutation cells, each carrying
+    // a valueSize-byte value so the cell payload is production-realistic 
relative to the header.
+    // The flat concatenation is what mode B encodes, while mode A re-frames 
each mutation's slice.
+    List<List<Cell>> perMutationCells = new ArrayList<>(mutationCount);
+    List<Cell> batchCells = new ArrayList<>(mutationCount * cellsPerMutation);
+    byte[] value = new byte[valueSize];
+    for (int m = 0; m < mutationCount; m++) {
+      Put put = new Put(Bytes.toBytes("row" + m));
+      put.setTimestamp(m);
+      for (int c = 0; c < cellsPerMutation; c++) {
+        put.addColumn(Bytes.toBytes("col" + c), Bytes.toBytes("q"), m, value);
+      }
+      List<Cell> cells = new ArrayList<>(put.getFamilyCellMap().size());
+      for (List<Cell> familyCells : put.getFamilyCellMap().values()) {
+        cells.addAll(familyCells);
+      }
+      perMutationCells.add(cells);
+      batchCells.addAll(cells);
+    }
+
+    LogFileCodec codec = new LogFileCodec();
+
+    // Mode A: one record per mutation, framed separately.
+    BenchResult a = runFramingMode(iterations, warmup, () -> {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(bos);
+      LogFile.Codec.Encoder encoder = codec.getEncoder(dos);
+      int recordCount = 0;
+      for (int m = 0; m < mutationCount; m++) {
+        LogFile.Record record = new 
LogFileRecord().setHBaseTableName(tableName).setCommitId(m)
+          .setCells(perMutationCells.get(m));
+        encoder.write(record);
+        recordCount++;
+      }
+      dos.flush();
+      return new int[] { bos.size(), recordCount };
+    });
+
+    // Mode B: one record carrying the whole batch, framed once.
+    BenchResult b = runFramingMode(iterations, warmup, () -> {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(bos);
+      LogFile.Codec.Encoder encoder = codec.getEncoder(dos);
+      LogFile.Record record =
+        new 
LogFileRecord().setHBaseTableName(tableName).setCommitId(0L).setCells(batchCells);
+      encoder.write(record);
+      dos.flush();
+      return new int[] { bos.size(), 1 };
+    });
+
+    long totalCells = (long) mutationCount * cellsPerMutation;
+    LOG.info(
+      "Framing benchmark params: mutationCount={} cellsPerMutation={} 
valueSize={} totalCells={} "
+        + "iterations={}",
+      mutationCount, cellsPerMutation, valueSize, totalCells, iterations);
+    logFramingMode("A(per-mutation)", a, totalCells);
+    logFramingMode("B(per-batch)", b, totalCells);
+    LOG.info(
+      "Framing A/B ratios: appendNs A/B={} wireBytes A/B={} wireBytesSaved={} "
+        + "(headerBytesPerRecord~={})",
+      String.format("%.2f", (double) a.appendNs / Math.max(1, b.appendNs)),
+      String.format("%.3f", (double) a.wireBytes / Math.max(1, b.wireBytes)),
+      a.wireBytes - b.wireBytes,
+      mutationCount > 1 ? (a.wireBytes - b.wireBytes) / (mutationCount - 1) : 
0);
+  }
+
+  /** Aggregated result of one framing mode: best-of encode time and the wire 
size produced. */
+  private static final class BenchResult {
+    final long appendNs;
+    final int wireBytes;
+    final int recordCount;
+
+    BenchResult(long appendNs, int wireBytes, int recordCount) {
+      this.appendNs = appendNs;
+      this.wireBytes = wireBytes;
+      this.recordCount = recordCount;
+    }
+  }
+
+  /** Body of one framing mode; returns {@code [wireBytes, recordCount]}. */
+  private interface FramingBody {
+    int[] run() throws IOException;
+  }
+
+  /**
+   * Runs {@code body} for {@code warmup} untimed iterations, then {@code 
iterations} timed ones,
+   * returning the minimum single-iteration encode time (least contaminated by 
GC/JIT) and the wire
+   * size + record count from the final iteration.
+   */
+  private static BenchResult runFramingMode(int iterations, int warmup, 
FramingBody body)
+    throws IOException {
+    for (int i = 0; i < warmup; i++) {
+      body.run();
+    }
+    long minNs = Long.MAX_VALUE;
+    int[] last = null;
+    for (int i = 0; i < iterations; i++) {
+      long startNs = System.nanoTime();
+      last = body.run();
+      long elapsed = System.nanoTime() - startNs;
+      if (elapsed < minNs) {
+        minNs = elapsed;
+      }
+    }
+    return new BenchResult(minNs, last[0], last[1]);
+  }
+
+  private static void logFramingMode(String label, BenchResult r, long 
totalCells) {
+    LOG.info(
+      "Framing mode {}: appendNs(min)={} recordCount={} wireBytes={} 
bytesPerCell={} "
+        + "nsPerCell={}",
+      label, r.appendNs, r.recordCount, r.wireBytes,
+      String.format("%.2f", (double) r.wireBytes / Math.max(1, totalCells)),
+      String.format("%.2f", (double) r.appendNs / Math.max(1, totalCells)));
+  }
+
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
index 1016eae131..59c105513c 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileCompressionTest.java
@@ -131,7 +131,7 @@ public class LogFileCompressionTest {
     for (int i = 0; i < 100; i++) {
       LogFile.Record record = LogFileTestUtil.newPutRecord("TBLSBWC", i, "row" 
+ i, 100L + i, 2);
       originals.add(record);
-      writer.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+      writer.append(record.getHBaseTableName(), record.getCommitId(), 
record.getCells());
     }
     writer.close();
     initLogFileReader();
@@ -148,7 +148,7 @@ public class LogFileCompressionTest {
     for (int i = 0; i < 100_000; i++) {
       LogFile.Record record = LogFileTestUtil.newPutRecord("TBLMBWC", i, "row" 
+ i, 100L + i, 5);
       originals.add(record);
-      writer.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+      writer.append(record.getHBaseTableName(), record.getCommitId(), 
record.getCells());
     }
     writer.close();
     initLogFileReader();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
index 1969fcc721..a0a4e43442 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileTestUtil.java
@@ -30,6 +30,8 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -44,6 +47,17 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 public interface LogFileTestUtil {
 
+  /**
+   * Flatten a mutation's family cell map into the cell list that the writer 
ultimately receives.
+   */
+  static List<Cell> cellsOf(Mutation mutation) {
+    List<Cell> cells = new ArrayList<>();
+    for (List<Cell> familyCells : mutation.getFamilyCellMap().values()) {
+      cells.addAll(familyCells);
+    }
+    return cells;
+  }
+
   static LogFile.Record newPutRecord(String table, long commitId, String 
rowKey, long ts,
     int numCols) {
     return new LogFileRecord().setMutation(newPut(rowKey, ts, 
numCols)).setHBaseTableName(table)
@@ -120,16 +134,33 @@ public interface LogFileTestUtil {
 
   static void assertRecordEquals(String message, LogFile.Record r1, 
LogFile.Record r2)
     throws AssertionError {
-    try {
-      if (
-        !r1.getMutation().toJSON().equals(r2.getMutation().toJSON())
-          || !r1.getHBaseTableName().equals(r2.getHBaseTableName())
-          || r1.getCommitId() != r2.getCommitId()
-      ) {
-        throw new AssertionError(message + ": left=" + r1 + ", right=" + r2);
+    if (
+      !r1.getHBaseTableName().equals(r2.getHBaseTableName()) || 
r1.getCommitId() != r2.getCommitId()
+    ) {
+      throw new AssertionError(message + ": left=" + r1 + ", right=" + r2);
+    }
+    if (r1.getCells().size() != r2.getCells().size()) {
+      throw new AssertionError(message + ": cell count mismatch: left=" + r1 + 
", right=" + r2);
+    }
+    for (int i = 0; i < r1.getCells().size(); i++) {
+      Cell c1 = r1.getCells().get(i);
+      Cell c2 = r2.getCells().get(i);
+      // CellUtil.equals compares row/family/qualifier/timestamp/type but not 
value.
+      if (!CellUtil.equals(c1, c2) || !CellUtil.matchingValue(c1, c2)) {
+        throw new AssertionError(
+          message + ": cell #" + i + " mismatch: left=" + r1 + ", right=" + 
r2);
+      }
+    }
+    if (r1.getAttributes().size() != r2.getAttributes().size()) {
+      throw new AssertionError(
+        message + ": attribute count mismatch: left=" + r1 + ", right=" + r2);
+    }
+    for (java.util.Map.Entry<String, byte[]> e : 
r1.getAttributes().entrySet()) {
+      byte[] other = r2.getAttributes().get(e.getKey());
+      if (other == null || !java.util.Arrays.equals(e.getValue(), other)) {
+        throw new AssertionError(
+          message + ": attribute '" + e.getKey() + "' mismatch: left=" + r1 + 
", right=" + r2);
       }
-    } catch (IOException e) {
-      throw new AssertionError(e.getMessage());
     }
   }
 
@@ -158,12 +189,22 @@ public interface LogFileTestUtil {
   }
 
   static void assertMutationEquals(String message, Mutation m1, Mutation m2) {
-    try {
-      if (!m1.toJSON().equals(m2.toJSON())) {
-        throw new AssertionError(message + ": left=" + m1 + ", right=" + m2);
+    if (!Bytes.equals(m1.getRow(), m2.getRow())) {
+      throw new AssertionError(message + ": row mismatch: left=" + m1 + ", 
right=" + m2);
+    }
+    if (m1.getClass() != m2.getClass()) {
+      throw new AssertionError(message + ": type mismatch: left=" + m1 + ", 
right=" + m2);
+    }
+    List<Cell> c1 = cellsOf(m1);
+    List<Cell> c2 = cellsOf(m2);
+    if (c1.size() != c2.size()) {
+      throw new AssertionError(message + ": cell count mismatch: left=" + m1 + 
", right=" + m2);
+    }
+    for (int i = 0; i < c1.size(); i++) {
+      if (!CellUtil.equals(c1.get(i), c2.get(i)) || 
!CellUtil.matchingValue(c1.get(i), c2.get(i))) {
+        throw new AssertionError(
+          message + ": cell #" + i + " mismatch: left=" + m1 + ", right=" + 
m2);
       }
-    } catch (IOException e) {
-      throw new AssertionError(e.getMessage());
     }
   }
 
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
index 8ecae075e9..baccf55973 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
@@ -88,9 +88,9 @@ public class LogFileWriterSyncTest {
 
     // Append data
     Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
-    writer.append("TBL", 1L, m1);
+    writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
     Mutation m2 = LogFileTestUtil.newPut("row2", 2L, 1);
-    writer.append("TBL", 2L, m2);
+    writer.append("TBL", 2L, LogFileTestUtil.cellsOf(m2));
 
     // Sync the writer
     writer.sync();
@@ -101,7 +101,7 @@ public class LogFileWriterSyncTest {
 
     // Append more data after sync
     Mutation m3 = LogFileTestUtil.newPut("row3", 12L, 1);
-    writer.append("TBL", 3L, m3);
+    writer.append("TBL", 3L, LogFileTestUtil.cellsOf(m3));
 
     // Sync again
     writer.sync();
@@ -116,7 +116,7 @@ public class LogFileWriterSyncTest {
 
     // Append A, then sync
     Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
-    writer.append("TBL", 1L, m1);
+    writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
     writer.sync();
 
     // Verify first hsync
@@ -124,7 +124,7 @@ public class LogFileWriterSyncTest {
 
     // Append B after sync
     Mutation m2 = LogFileTestUtil.newPut("row2", 2L, 1);
-    writer.append("TBL", 2L, m2);
+    writer.append("TBL", 2L, LogFileTestUtil.cellsOf(m2));
 
     // Verify hsync was NOT called immediately after appending B. It might be 
called later on
     // close or another sync.
@@ -143,7 +143,7 @@ public class LogFileWriterSyncTest {
 
     // Append A
     Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
-    writer.append("TBL", 1L, m1);
+    writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
 
     // Sync 1
     writer.sync();
@@ -156,7 +156,7 @@ public class LogFileWriterSyncTest {
 
     // Append B
     Mutation m2 = LogFileTestUtil.newPut("row2", 2L, 1);
-    writer.append("TBL", 2L, m2);
+    writer.append("TBL", 2L, LogFileTestUtil.cellsOf(m2));
 
     // Sync 3
     writer.sync();
@@ -181,7 +181,7 @@ public class LogFileWriterSyncTest {
 
     // Append a record
     Mutation m1 = LogFileTestUtil.newPut("row", 1L, 1);
-    writer.append("TBL", 1L, m1);
+    writer.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
 
     // Sync again
     writer.sync();
@@ -211,7 +211,7 @@ public class LogFileWriterSyncTest {
 
     try {
       Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);
-      hflushWriter.append("TBL", 1L, m1);
+      hflushWriter.append("TBL", 1L, LogFileTestUtil.cellsOf(m1));
       hflushWriter.sync();
 
       verify(hflushOutput, times(1)).hflush();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
index 95c6bb5133..e0e8d59985 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
@@ -73,10 +73,10 @@ public class LogFileWriterTest {
     initLogFileWriter();
     LogFile.Record r1 = LogFileTestUtil.newPutRecord("TBLTLFW", 100, "row1", 
10L, 1);
     LogFile.Record r2 = LogFileTestUtil.newDeleteRecord("TBLTLFW", 101, 
"row2", 11L, 1);
-    writer.append(r1.getHBaseTableName(), r1.getCommitId(), r1.getMutation());
+    writer.append(r1.getHBaseTableName(), r1.getCommitId(), r1.getCells());
     LOG.debug("Appended " + r1);
     writer.sync();
-    writer.append(r2.getHBaseTableName(), r2.getCommitId(), r2.getMutation());
+    writer.append(r2.getHBaseTableName(), r2.getCommitId(), r2.getCells());
     LOG.debug("Appended " + r2);
     writer.close();
 
@@ -108,7 +108,7 @@ public class LogFileWriterTest {
       LogFile.Record record =
         LogFileTestUtil.newPutRecord("TBLFRI", 100L + i, "row" + i, 10L + i, 
1);
       originals.add(record);
-      writer.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+      writer.append(record.getHBaseTableName(), record.getCommitId(), 
record.getCells());
     }
     writer.close();
 

Reply via email to