This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new c7936b1948 PHOENIX-7846 Bound rotation replay cost for large commit 
batches (#2469)
c7936b1948 is described below

commit c7936b194883def557fa78d2d05960382d79ddf7
Author: tkhurana <[email protected]>
AuthorDate: Fri May 15 11:29:20 2026 -0700

    PHOENIX-7846 Bound rotation replay cost for large commit batches (#2469)
---
 .../apache/phoenix/replication/ReplicationLog.java |  9 ++++-
 .../apache/phoenix/replication/log/LogFile.java    |  3 +-
 .../replication/log/LogFileFormatWriter.java       |  4 +-
 .../phoenix/replication/log/LogFileWriter.java     |  4 +-
 .../replication/ReplicationLogGroupTest.java       | 46 ++++++++++++++++++++++
 5 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index d6b7b01535..f94069d088 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -339,9 +339,16 @@ public class ReplicationLog {
   }
 
   protected void append(Record r) throws IOException {
-    apply(writer -> writer.append(r.tableName, r.commitId, r.mutation));
+    final boolean[] blockSynced = { false };
+    apply(writer -> {
+      blockSynced[0] = writer.append(r.tableName, r.commitId, r.mutation);
+    });
     // Add to current batch only after we succeed at appending
     currentBatch.add(r);
+    if (blockSynced[0]) {
+      // The block-full sync included this record — all records up to here are 
durable
+      currentBatch.clear();
+    }
   }
 
   protected void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index b1f05f4af9..a93ada0b48 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -328,9 +328,10 @@ public interface LogFile {
      * @param tableName The HBase table name
      * @param commitId  The commit identifier
      * @param mutation  The mutation to append.
+     * @return true if an implicit sync happened (block full), false if 
buffered only
      * @throws IOException if an I/O error occurs during append.
      */
-    void append(String tableName, long commitId, Mutation mutation) throws 
IOException;
+    boolean append(String tableName, long commitId, Mutation mutation) throws 
IOException;
 
     /**
      * Flushes any buffered data to the underlying storage and ensures it is 
durable (e.g., by
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 11b427112a..ead68e0ba8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -74,7 +74,7 @@ public class LogFileFormatWriter implements Closeable {
     return blocksStartOffset;
   }
 
-  public void append(LogFile.Record record) throws IOException {
+  public boolean append(LogFile.Record record) throws IOException {
     if (blockDataStream == null) {
       startBlock(); // Start the block if needed
     }
@@ -86,7 +86,9 @@ public class LogFileFormatWriter implements Closeable {
       // To close the block, we do a sync(), which not only closes the block 
and opens a
       // new one, it syncs the finalized block.
       sync();
+      return true;
     }
+    return false;
   }
 
   // Should be called before writing the first record.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 2a0f4c5dd1..7f5ba64084 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -78,11 +78,11 @@ public class LogFileWriter implements LogFile.Writer {
   }
 
   @Override
-  public void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
+  public boolean append(String tableName, long commitId, Mutation mutation) 
throws IOException {
     if (isClosed()) {
       throw new IOException("Writer has been closed");
     }
-    writer.append(
+    return writer.append(
       new 
LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setMutation(mutation));
   }
 
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 2bf723defd..ddd0cea0ec 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
 import org.apache.phoenix.replication.log.LogFileTestUtil;
 import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
@@ -1873,4 +1874,49 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       group.close();
     }
   }
+
+  /**
+   * Tests that rotation after block-full syncs replays only the records from 
the last partial
+   * block, not the entire inter-sync window.
+   */
+  @Test
+  public void testBlockFullSyncOnAppendReducesReplayOnRotation() throws 
Exception {
+    final String tableName = "TBLBFR";
+    // Use a very small block size to trigger block-full sync quickly
+    conf.setLong(LogFileWriterContext.LOGFILE_BLOCK_SIZE, 200L);
+    // Use a short round duration so we can trigger rotation via time
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 1);
+    recreateLogGroup();
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter oldWriter = activeLog.getWriter();
+
+    // Append 20 records without an explicit sync — block-full syncs will fire 
along the way,
+    // clearing currentBatch at each block boundary
+    long id;
+    for (id = 1; id <= 20; id++) {
+      Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2);
+      logGroup.append(tableName, id, put);
+    }
+
+    // Wait for the rotation tick to stage a new pendingWriter
+    waitForRotationTick(1);
+
+    // Append one more record and sync — apply() drains pendingWriter 
triggering replay,
+    // sync() ensures the disruptor has fully processed replay + new append
+    id++;
+    logGroup.append(tableName, id, LogFileTestUtil.newPut("row_21", 21, 2));
+    logGroup.sync();
+
+    LogFileWriter newWriter = activeLog.getWriter();
+    assertNotEquals("Writer should have been rotated", oldWriter, newWriter);
+
+    // Without block-full clearing, the new writer would receive a replay of 
all 20 records
+    // plus the new append (21 appends total). With block-full clearing, it 
receives only the
+    // partial-block tail plus the new append — strictly fewer than 21.
+    int newWriterAppendCount = 
Mockito.mockingDetails(newWriter).getInvocations().stream()
+      .filter(inv -> inv.getMethod().getName().equals("append")).mapToInt(inv 
-> 1).sum();
+    assertTrue("Replay should be bounded by last partial block, not all 
records. Got: "
+      + newWriterAppendCount, newWriterAppendCount < id);
+  }
 }

Reply via email to