This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch PHOENIX-7562-feature-new in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 14898c893522c6b9dff15b3737b7d4321017c2ff Author: tkhurana <[email protected]> AuthorDate: Tue Jun 9 10:47:28 2026 -0700 PHOENIX-7885 testReadAfterWrite and testReadAfterMultipleRotations flake on async writer close Tests that read a rotated log file from disk could intermittently fail with InvalidLogTrailerException ("Unsupported version. Got major=5 minor=114") because forceRotation() did not wait for the prior writer to finish closing. The race: LogRotationTask publishes a SWAP event after staging the new pendingWriter. The disruptor consumer thread picks up the SWAP and calls checkAndReplaceWriter(true) — asyncClose=true. The consumer's swap usually wins the pendingWriter.getAndSet(null) race against forceRotation's subsequent checkAndReplaceWriter(false), so the old writer's close runs on closeExecutor. Tests that opened the file before that close completed read garbage at the trailer offset. Fixes: - testReadAfterWrite: replace forceRotation() with activeLog.close(true). Single rotation followed by reads — synchronous close gives the fence. - testReadAfterMultipleRotations, testReadAfterMultipleRotationsWithVaryingBatchSizes: add logGroup.close() after the rotation loop, before reads. close() shuts down closeExecutor and awaits termination, draining all pending async closes from the loop's rotations. - ReplicationLog.forceRotation: drop the redundant checkAndReplaceWriter(false) call. The SWAP event already drains pendingWriter on the consumer thread; the synchronous call was racy and effectively a no-op when the consumer won, which is most of the time. Verified by running the full ReplicationLogGroupTest class 10x (480 test executions, 0 failures). --- .../java/org/apache/phoenix/replication/ReplicationLog.java | 1 - .../apache/phoenix/replication/ReplicationLogGroupTest.java | 13 +++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java index 321c9fbd2f..40337c190f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java @@ -418,7 +418,6 @@ public class ReplicationLog { @VisibleForTesting protected void forceRotation() { new LogRotationTask().run(); - checkAndReplaceWriter(false); } protected FileSystem getFileSystem(URI uri) throws IOException { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 2f81671835..dcd4cd55d3 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -802,8 +802,9 @@ public class ReplicationLogGroupTest extends ReplicationLogBaseTest { } logGroup.sync(); // Sync to commit the appends to the current writer. - // Force a rotation to close the current writer. - activeLog.forceRotation(); + // Close the log synchronously so the writer's trailer is durably written before we open the + // file for reading. + activeLog.close(true); assertTrue("Log file should exist", localFs.exists(logPath)); @@ -864,6 +865,10 @@ public class ReplicationLogGroupTest extends ReplicationLogBaseTest { activeLog.forceRotation(); } + // Close the log group synchronously so all pending async closes from the rotations above + // finish (trailers durably written) before we open the files for reading. + logGroup.close(); + // Verify all log files exist for (Path logPath : logPaths) { assertTrue("Log file should exist: " + logPath, localFs.exists(logPath)); @@ -929,6 +934,10 @@ public class ReplicationLogGroupTest extends ReplicationLogBaseTest { activeLog.forceRotation(); } + // Close the log group synchronously so all pending async closes from the rotations above + // finish (trailers durably written) before we open the files for reading. + logGroup.close(); + // Verify all log files exist for (Path logPath : logPaths) { assertTrue("Log file should exist: " + logPath, localFs.exists(logPath));
