This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new c7936b1948 PHOENIX-7846 Bound rotation replay cost for large commit
batches (#2469)
c7936b1948 is described below
commit c7936b194883def557fa78d2d05960382d79ddf7
Author: tkhurana <[email protected]>
AuthorDate: Fri May 15 11:29:20 2026 -0700
PHOENIX-7846 Bound rotation replay cost for large commit batches (#2469)
---
.../apache/phoenix/replication/ReplicationLog.java | 9 ++++-
.../apache/phoenix/replication/log/LogFile.java | 3 +-
.../replication/log/LogFileFormatWriter.java | 4 +-
.../phoenix/replication/log/LogFileWriter.java | 4 +-
.../replication/ReplicationLogGroupTest.java | 46 ++++++++++++++++++++++
5 files changed, 61 insertions(+), 5 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index d6b7b01535..f94069d088 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -339,9 +339,16 @@ public class ReplicationLog {
}
protected void append(Record r) throws IOException {
- apply(writer -> writer.append(r.tableName, r.commitId, r.mutation));
+ final boolean[] blockSynced = { false };
+ apply(writer -> {
+ blockSynced[0] = writer.append(r.tableName, r.commitId, r.mutation);
+ });
// Add to current batch only after we succeed at appending
currentBatch.add(r);
+ if (blockSynced[0]) {
+ // The block-full sync included this record — all records up to here are
durable
+ currentBatch.clear();
+ }
}
protected void append(String tableName, long commitId, Mutation mutation)
throws IOException {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index b1f05f4af9..a93ada0b48 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -328,9 +328,10 @@ public interface LogFile {
* @param tableName The HBase table name
* @param commitId The commit identifier
* @param mutation The mutation to append.
+ * @return true if an implicit sync happened (block full), false if
buffered only
* @throws IOException if an I/O error occurs during append.
*/
- void append(String tableName, long commitId, Mutation mutation) throws
IOException;
+ boolean append(String tableName, long commitId, Mutation mutation) throws
IOException;
/**
* Flushes any buffered data to the underlying storage and ensures it is
durable (e.g., by
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 11b427112a..ead68e0ba8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -74,7 +74,7 @@ public class LogFileFormatWriter implements Closeable {
return blocksStartOffset;
}
- public void append(LogFile.Record record) throws IOException {
+ public boolean append(LogFile.Record record) throws IOException {
if (blockDataStream == null) {
startBlock(); // Start the block if needed
}
@@ -86,7 +86,9 @@ public class LogFileFormatWriter implements Closeable {
// To close the block, we do a sync(), which not only closes the block
and opens a
// new one, it syncs the finalized block.
sync();
+ return true;
}
+ return false;
}
// Should be called before writing the first record.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 2a0f4c5dd1..7f5ba64084 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -78,11 +78,11 @@ public class LogFileWriter implements LogFile.Writer {
}
@Override
- public void append(String tableName, long commitId, Mutation mutation)
throws IOException {
+ public boolean append(String tableName, long commitId, Mutation mutation)
throws IOException {
if (isClosed()) {
throw new IOException("Writer has been closed");
}
- writer.append(
+ return writer.append(
new
LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setMutation(mutation));
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 2bf723defd..ddd0cea0ec 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
import org.apache.phoenix.replication.log.LogFileTestUtil;
import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
@@ -1873,4 +1874,49 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
group.close();
}
}
+
+ /**
+ * Tests that rotation after block-full syncs replays only the records from
the last partial
+ * block, not the entire inter-sync window.
+ */
+ @Test
+ public void testBlockFullSyncOnAppendReducesReplayOnRotation() throws
Exception {
+ final String tableName = "TBLBFR";
+ // Use a very small block size to trigger block-full sync quickly
+ conf.setLong(LogFileWriterContext.LOGFILE_BLOCK_SIZE, 200L);
+ // Use a short round duration so we can trigger rotation via time
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 1);
+ recreateLogGroup();
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter oldWriter = activeLog.getWriter();
+
+ // Append 20 records without an explicit sync — block-full syncs will fire
along the way,
+ // clearing currentBatch at each block boundary
+ long id;
+ for (id = 1; id <= 20; id++) {
+ Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2);
+ logGroup.append(tableName, id, put);
+ }
+
+ // Wait for the rotation tick to stage a new pendingWriter
+ waitForRotationTick(1);
+
+ // Append one more record and sync — apply() drains pendingWriter
triggering replay,
+ // sync() ensures the disruptor has fully processed replay + new append
+ id++;
+ logGroup.append(tableName, id, LogFileTestUtil.newPut("row_21", 21, 2));
+ logGroup.sync();
+
+ LogFileWriter newWriter = activeLog.getWriter();
+ assertNotEquals("Writer should have been rotated", oldWriter, newWriter);
+
+ // Without block-full clearing, the new writer would receive a replay of
all 20 records
+ // plus the new append (21 appends total). With block-full clearing, it
receives only the
+ // partial-block tail plus the new append — strictly fewer than 21.
+ int newWriterAppendCount =
Mockito.mockingDetails(newWriter).getInvocations().stream()
+ .filter(inv -> inv.getMethod().getName().equals("append")).mapToInt(inv
-> 1).sum();
+ assertTrue("Replay should be bounded by last partial block, not all
records. Got: "
+ + newWriterAppendCount, newWriterAppendCount < id);
+ }
}