This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 4a08b1d10d PHOENIX-7862 addendum Fix the race which caused double 
rotation
4a08b1d10d is described below

commit 4a08b1d10d3bfeb46618c3b9fba3f687ffc5c0cd
Author: tkhurana <[email protected]>
AuthorDate: Tue Jun 2 17:50:01 2026 -0700

    PHOENIX-7862 addendum Fix the race which caused double rotation
---
 .../apache/phoenix/replication/ReplicationLog.java | 61 +++++++++++-----------
 .../replication/ReplicationLogGroupTest.java       | 36 +++++--------
 2 files changed, 45 insertions(+), 52 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 410bd17ddb..321c9fbd2f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -79,10 +79,15 @@ public class ReplicationLog {
   protected final AtomicLong rotationFailures = new AtomicLong(0);
   // Staged writer created by the background LogRotationTask, drained by 
checkAndReplaceWriter().
   private final AtomicReference<LogFileWriter> pendingWriter = new 
AtomicReference<>();
-  // Latch set by apply() before requesting an on-demand rotation; counted 
down by LogRotationTask
-  // in a finally block so apply() can wait (with timeout) for a fresh writer 
to be staged.
+  // Latch set by apply() on the retry path before calling requestRotation(); 
counted down by
+  // LogRotationTask in a finally block so apply() can wait (with timeout) for 
a fresh writer.
   private volatile CountDownLatch rotationStagedLatch;
   private final AtomicBoolean closed = new AtomicBoolean(false);
+  // Single gate for rotation submission. Set by requestRotation()'s CAS 
before queuing a task,
+  // cleared in LogRotationTask's finally. Both scheduled ticks and on-demand 
callers go through
+  // requestRotation(), so a request that arrives while a rotation is queued 
or running is a
+  // no-op — preventing the duplicate-writer bug where an in-flight scheduled 
rotation and a
+  // concurrent size-triggered request both stage writers and the second 
closes the first.
   @VisibleForTesting
   final AtomicBoolean rotationRequested = new AtomicBoolean(false);
   private final ExecutorService closeExecutor;
@@ -166,7 +171,9 @@ public class ReplicationLog {
     rotationExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
       .setNameFormat("ReplicationLogRotation-" + logGroup.getHAGroupName() + 
"-%d").setDaemon(true)
       .build());
-    rotationExecutor.scheduleAtFixedRate(new LogRotationTask(false), 
initialDelay, rotationTimeMs,
+    // Scheduled ticks route through requestRotation() so they share the same 
CAS gate as on-demand
+    // size-triggered rotations — only one rotation can be queued or running 
at a time.
+    rotationExecutor.scheduleAtFixedRate(this::requestRotation, initialDelay, 
rotationTimeMs,
       TimeUnit.MILLISECONDS);
     LOG.info("Started rotation executor with initial delay {}ms and interval 
{}ms", initialDelay,
       rotationTimeMs);
@@ -236,25 +243,29 @@ public class ReplicationLog {
   }
 
   /**
-   * Submits an on-demand {@link LogRotationTask} to the executor. The 
compareAndSet avoids
-   * submitting duplicate tasks — if the flag is already set, a task is 
already queued.
+   * Submits a {@link LogRotationTask} to the executor. The CAS gate ensures 
only one rotation can
+   * be queued or in flight at a time — if the flag is already set, a task is 
already pending or
+   * running. Both scheduled ticks and on-demand size-triggered callers go 
through this method.
    */
   private void requestRotation() {
     if (rotationRequested.compareAndSet(false, true)) {
       try {
-        rotationExecutor.execute(new LogRotationTask(true));
+        rotationExecutor.execute(new LogRotationTask());
       } catch (java.util.concurrent.RejectedExecutionException e) {
-        LOG.info("Rotation executor shut down, skipping on-demand rotation", 
e);
+        LOG.info("Rotation executor shut down, skipping rotation", e);
         rotationRequested.set(false);
       }
     }
   }
 
   /**
-   * Requests an on-demand rotation if the current writer exceeds the size 
threshold.
+   * Requests a rotation if the current writer exceeds the size threshold. 
Skips when a writer is
+   * already staged but not yet drained — the consumer hasn't swapped in the 
new writer yet, so
+   * {@code currentWriter} still reports the old (over-threshold) length and a 
fresh request would
+   * stage a redundant writer that the next task immediately closes.
    */
-  private void requestRotationIfNeeded() throws IOException {
-    if (isClosed() || rotationRequested.get()) {
+  private void requestRotationIfOversized() throws IOException {
+    if (isClosed() || rotationRequested.get() || pendingWriter.get() != null) {
       return;
     }
     if (shouldRotateForSize()) {
@@ -352,7 +363,7 @@ public class ReplicationLog {
       // unsynced records are replayed into it and could exceed the threshold 
immediately,
       // triggering another rotation and forming a loop. Checking only when 
the batch is empty
       // ensures size-based rotations don't cascade.
-      requestRotationIfNeeded();
+      requestRotationIfOversized();
     }
   }
 
@@ -365,7 +376,7 @@ public class ReplicationLog {
     currentBatch.clear();
     // See note in append(Record): size check runs only when currentBatch is 
empty so a replay
     // into a freshly-staged writer cannot trigger a cascading rotation.
-    requestRotationIfNeeded();
+    requestRotationIfOversized();
   }
 
   /**
@@ -406,7 +417,7 @@ public class ReplicationLog {
 
   @VisibleForTesting
   protected void forceRotation() {
-    new LogRotationTask(false).run();
+    new LogRotationTask().run();
     checkAndReplaceWriter(false);
   }
 
@@ -416,21 +427,14 @@ public class ReplicationLog {
 
   /**
    * Creates a new writer on a background thread and stages it in {@code 
pendingWriter} for the
-   * consumer thread to drain inside {@link #apply}. Invoked both by the 
scheduled rotation executor
-   * at round boundaries and on-demand via {@link #requestRotation()}.
+   * consumer thread to drain inside {@link #apply}. Submitted via {@link 
#requestRotation()} from
+   * both the scheduled tick and on-demand size-triggered callers.
    * <p>
-   * Only the on-demand path owns {@code rotationRequested} — it sets the flag 
before submitting the
-   * task and clears it in {@code finally}. Scheduled rotations must not touch 
the flag, otherwise
-   * they could clear a flag set by a concurrent on-demand request whose task 
hasn't run yet,
-   * allowing duplicate on-demand submissions to be queued.
+   * {@code rotationRequested} stays set for the entire duration of run() 
(cleared in finally), so
+   * any request that arrives while this task is creating/staging a writer is 
rejected by
+   * {@link #requestRotation()}'s CAS and not duplicated.
    */
   protected class LogRotationTask implements Runnable {
-    private final boolean onDemand;
-
-    LogRotationTask(boolean onDemand) {
-      this.onDemand = onDemand;
-    }
-
     @Override
     public void run() {
       if (closed.get()) {
@@ -461,11 +465,8 @@ public class ReplicationLog {
       } finally {
         // Time both success and failure paths so slow rotations are visible 
even when they fail.
         logGroup.getMetrics().updateRotationTime(System.nanoTime() - startNs);
-        if (onDemand) {
-          // Clear the flag last so requestRotation()'s CAS rejects duplicate 
on-demand
-          // submissions while this task is still creating/staging a writer.
-          rotationRequested.set(false);
-        }
+        // Clear last so requestRotation()'s CAS suppresses duplicates 
throughout this run.
+        rotationRequested.set(false);
         CountDownLatch latch = rotationStagedLatch;
         if (latch != null) {
           latch.countDown();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index c204cd0a27..349fdb8620 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -392,8 +392,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
   /**
    * Tests size-based log rotation. Verifies that the log file is rotated when 
it exceeds the
    * configured size threshold and that operations continue correctly with the 
new log file. After
-   * the first sync, requestRotationIfNeeded submits an on-demand 
LogRotationTask which creates the
-   * new writer immediately on the executor thread.
+   * the first sync, requestRotationIfOversized submits an on-demand 
LogRotationTask which creates
+   * the new writer immediately on the executor thread.
    */
   @Test
   public void testSizeBasedRotation() throws Exception {
@@ -409,7 +409,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     for (int i = 0; i < 100; i++) {
       logGroup.append(tableName, commitId++, put);
     }
-    // Sync: data goes to old writer. requestRotationIfNeeded submits 
on-demand rotation task.
+    // Sync: data goes to old writer. requestRotationIfOversized submits 
on-demand rotation task.
     logGroup.sync();
 
     // Wait for the on-demand rotation task to create a new writer on the 
background thread
@@ -1914,14 +1914,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
   /**
    * Tests that the size check is not invoked from inside apply(). Pre-fix, 
every successful action
-   * inside apply() ran requestRotationIfNeeded(); when a rotation swapped in 
a new writer mid-batch
-   * and replay made it exceed the threshold, the size check after every 
append would request
-   * another rotation, drain it on the next apply(), replay again, and loop 
indefinitely.
+   * inside apply() ran requestRotationIfOversized(); when a rotation swapped 
in a new writer
+   * mid-batch and replay made it exceed the threshold, the size check after 
every append would
+   * request another rotation, drain it on the next apply(), replay again, and 
loop indefinitely.
    * <p>
    * The new writer is mocked so its getLength() always reports above the 
rotation threshold. On the
    * buggy code this triggers a rotation request after every successful 
append, which cascades into
-   * one writer per append. With the fix, requestRotationIfNeeded() runs only 
after currentBatch is
-   * cleared, so the chain is bounded.
+   * one writer per append. With the fix, requestRotationIfOversized() runs 
only after currentBatch
+   * is cleared, so the chain is bounded.
    */
   @Test
   public void testSizeRotationDoesNotLoopOnReplay() throws Exception {
@@ -1960,7 +1960,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     waitForRotationTick(roundDurationSeconds);
 
     // Continue appending. On the buggy code, the first such append drains the 
staged writer,
-    // replays the batch, then requestRotationIfNeeded() inside apply() fires 
(length is over
+    // replays the batch, then requestRotationIfOversized() inside apply() 
fires (length is over
     // threshold) → requests another rotation. Subsequent appends drain that 
one too, replay,
     // request another, and so on — one new writer per append.
     for (int i = 0; i < numAppendsAfterTick; i++) {
@@ -1979,24 +1979,16 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
   }
 
   /**
-   * Verifies the contract of {@code LogRotationTask}'s {@code onDemand} flag: 
a scheduled task
-   * (onDemand=false) must not clear {@code rotationRequested}, and an 
on-demand task
-   * (onDemand=true) must clear it. This prevents a scheduled tick from 
stomping the flag set by a
-   * concurrent on-demand request whose task hasn't run yet, which would allow 
duplicate on-demand
-   * submissions to be queued.
+   * Both scheduled ticks and on-demand callers go through {@code 
requestRotation()}, so a task
+   * always owns {@code rotationRequested} and clears it in finally.
    */
   @Test
-  public void testScheduledRotationDoesNotClearOnDemandFlag() {
+  public void testRotationTaskClearsRequestedFlag() {
     ReplicationLog activeLog = logGroup.getActiveLog();
 
     activeLog.rotationRequested.set(true);
-    activeLog.new LogRotationTask(false).run();
-    assertTrue("Scheduled rotation must not clear rotationRequested",
-      activeLog.rotationRequested.get());
-
-    activeLog.new LogRotationTask(true).run();
-    assertFalse("On-demand rotation must clear rotationRequested",
-      activeLog.rotationRequested.get());
+    activeLog.new LogRotationTask().run();
+    assertFalse("LogRotationTask must clear rotationRequested", 
activeLog.rotationRequested.get());
   }
 
   /**

Reply via email to