This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new af48a51a57 PHOENIX-7862 Fix size-based rotation loop (#2485)
af48a51a57 is described below

commit af48a51a573ee5f3a373a16849334b952d92e07c
Author: tkhurana <[email protected]>
AuthorDate: Wed May 20 17:43:29 2026 -0700

    PHOENIX-7862 Fix size-based rotation loop (#2485)
---
 .../apache/phoenix/replication/ReplicationLog.java | 39 ++++++++--
 .../replication/ReplicationLogGroupTest.java       | 88 ++++++++++++++++++++++
 2 files changed, 120 insertions(+), 7 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index f94069d088..16e62f5bb8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -82,7 +82,8 @@ public class ReplicationLog {
   // in a finally block so apply() can wait (with timeout) for a fresh writer 
to be staged.
   private volatile CountDownLatch rotationStagedLatch;
   private final AtomicBoolean closed = new AtomicBoolean(false);
-  private final AtomicBoolean rotationRequested = new AtomicBoolean(false);
+  @VisibleForTesting
+  final AtomicBoolean rotationRequested = new AtomicBoolean(false);
   private final ExecutorService closeExecutor;
   protected ScheduledExecutorService rotationExecutor;
   // Manages the creation of the actual log file in the shard directory
@@ -168,7 +169,7 @@ public class ReplicationLog {
     rotationExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
       .setNameFormat("ReplicationLogRotation-" + logGroup.getHAGroupName() + 
"-%d").setDaemon(true)
       .build());
-    rotationExecutor.scheduleAtFixedRate(new LogRotationTask(), initialDelay, 
rotationTimeMs,
+    rotationExecutor.scheduleAtFixedRate(new LogRotationTask(false), 
initialDelay, rotationTimeMs,
       TimeUnit.MILLISECONDS);
     LOG.info("Started rotation executor with initial delay {}ms and interval 
{}ms", initialDelay,
       rotationTimeMs);
@@ -244,7 +245,7 @@ public class ReplicationLog {
   private void requestRotation() {
     if (rotationRequested.compareAndSet(false, true)) {
       try {
-        rotationExecutor.execute(new LogRotationTask());
+        rotationExecutor.execute(new LogRotationTask(true));
       } catch (java.util.concurrent.RejectedExecutionException e) {
         LOG.info("Rotation executor shut down, skipping on-demand rotation", 
e);
         rotationRequested.set(false);
@@ -256,6 +257,9 @@ public class ReplicationLog {
    * Requests an on-demand rotation if the current writer exceeds the size 
threshold.
    */
   private void requestRotationIfNeeded() throws IOException {
+    if (isClosed() || rotationRequested.get()) {
+      return;
+    }
     if (shouldRotateForSize()) {
       requestRotation();
     }
@@ -316,7 +320,6 @@ public class ReplicationLog {
           generation = currentWriter.getGeneration();
         }
         action.action(currentWriter);
-        requestRotationIfNeeded();
         break;
       } catch (IOException e) {
         LOG.debug("Attempt {}/{} failed", attempt, maxAttempts, e);
@@ -348,6 +351,11 @@ public class ReplicationLog {
     if (blockSynced[0]) {
       // The block-full sync included this record — all records up to here are 
durable
       currentBatch.clear();
+      // Check size only after currentBatch is empty: if a rotation stages a 
new writer mid-batch,
+      // unsynced records are replayed into it and could exceed the threshold 
immediately,
+      // triggering another rotation and forming a loop. Checking only when 
the batch is empty
+      // ensures size-based rotations don't cascade.
+      requestRotationIfNeeded();
     }
   }
 
@@ -358,6 +366,9 @@ public class ReplicationLog {
   protected void sync() throws IOException {
     apply(LogFileWriter::sync);
     currentBatch.clear();
+    // See note in append(Record): size check runs only when currentBatch is 
empty so a replay
+    // into a freshly-staged writer cannot trigger a cascading rotation.
+    requestRotationIfNeeded();
   }
 
   /**
@@ -398,7 +409,7 @@ public class ReplicationLog {
 
   @VisibleForTesting
   protected void forceRotation() {
-    new LogRotationTask().run();
+    new LogRotationTask(false).run();
     checkAndReplaceWriter(false);
   }
 
@@ -410,15 +421,24 @@ public class ReplicationLog {
    * Creates a new writer on a background thread and stages it in {@code 
pendingWriter} for the
    * consumer thread to drain inside {@link #apply}. Invoked both by the 
scheduled rotation executor
    * at round boundaries and on-demand via {@link #requestRotation()}.
+   * <p>
+   * Only the on-demand path owns {@code rotationRequested} — it sets the flag 
before submitting the
+   * task and clears it in {@code finally}. Scheduled rotations must not touch 
the flag, otherwise
+   * they could clear a flag set by a concurrent on-demand request whose task 
hasn't run yet,
+   * allowing duplicate on-demand submissions to be queued.
    */
   protected class LogRotationTask implements Runnable {
+    private final boolean onDemand;
+
+    LogRotationTask(boolean onDemand) {
+      this.onDemand = onDemand;
+    }
+
     @Override
     public void run() {
       if (closed.get()) {
         return;
       }
-      rotationRequested.compareAndSet(true, false);
-
       try {
         LogFileWriter newWriter = createNewWriter();
         LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
@@ -439,6 +459,11 @@ public class ReplicationLog {
             numFailures, maxRotationRetries, t);
         }
       } finally {
+        if (onDemand) {
+          // Clear the flag last so requestRotation()'s CAS rejects duplicate 
on-demand
+          // submissions while this task is still creating/staging a writer.
+          rotationRequested.set(false);
+        }
         CountDownLatch latch = rotationStagedLatch;
         if (latch != null) {
           latch.countDown();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 3fd7702e6a..55bce05d74 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -46,6 +46,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord;
@@ -1907,4 +1908,91 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertTrue("Replay should be bounded by last partial block, not all 
records. Got: "
       + newWriterAppendCount, newWriterAppendCount < id);
   }
+
+  /**
+   * Tests that the size check is not invoked from inside apply(). Pre-fix, 
every successful action
+   * inside apply() ran requestRotationIfNeeded(); when a rotation swapped in 
a new writer mid-batch
+   * and replay made it exceed the threshold, the size check after every 
append would request
+   * another rotation, drain it on the next apply(), replay again, and loop 
indefinitely.
+   * <p>
+   * The new writer is mocked so its getLength() always reports above the 
rotation threshold. On the
+   * buggy code this triggers a rotation request after every successful 
append, which cascades into
+   * one writer per append. With the fix, requestRotationIfNeeded() runs only 
after currentBatch is
+   * cleared, so the chain is bounded.
+   */
+  @Test
+  public void testSizeRotationDoesNotLoopOnReplay() throws Exception {
+    final String tableName = "TBLSRDLOR";
+    long commitId = 1L;
+    // Long enough that only the first scheduled tick can fire within the test 
window.
+    final int roundDurationSeconds = 1;
+    final int numAppendsAfterTick = 20;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Count writers created so we can detect a rotation chain. Each writer 
created after the
+    // initial one is configured to report a length above the rotation 
threshold so the size
+    // check (wherever it runs) sees an over-threshold writer.
+    AtomicInteger writersCreated = new AtomicInteger(1); // initial writer 
already created
+    final long oversizedLength = TEST_ROTATION_SIZE_BYTES + 1;
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      doReturn(oversizedLength).when(w).getLength();
+      writersCreated.incrementAndGet();
+      return w;
+    }).when(activeLog).createNewWriter();
+
+    // Append unsynced records so currentBatch is non-empty when the rotation 
tick fires.
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    for (int i = 0; i < 5; i++) {
+      logGroup.append(tableName, commitId++, put);
+    }
+
+    // Wait for the time-based rotation tick to stage a fresh pendingWriter.
+    waitForRotationTick(roundDurationSeconds);
+
+    // Continue appending. On the buggy code, the first such append drains the 
staged writer,
+    // replays the batch, then requestRotationIfNeeded() inside apply() fires 
(length is over
+    // threshold) → requests another rotation. Subsequent appends drain that 
one too, replay,
+    // request another, and so on — one new writer per append.
+    for (int i = 0; i < numAppendsAfterTick; i++) {
+      logGroup.append(tableName, commitId++, put);
+    }
+    logGroup.sync();
+
+    // Without the fix the count grows roughly linearly with 
numAppendsAfterTick (one new
+    // writer per append). With the fix, it is bounded regardless of how many 
appends are
+    // performed: initial + scheduled ticks (test runtime ~2s with 1s round = 
1-2 ticks) +
+    // at most one size-triggered rotation per sync point. Use a generous 
bound that is well
+    // below the buggy behavior (which produced ~24) but tolerates extra 
scheduled ticks.
+    int writers = writersCreated.get();
+    assertTrue("Size-based rotation must not loop. Writers created: " + 
writers,
+      writers <= numAppendsAfterTick / 2);
+  }
+
+  /**
+   * Verifies the contract of {@code LogRotationTask}'s {@code onDemand} flag: 
a scheduled task
+   * (onDemand=false) must not clear {@code rotationRequested}, and an 
on-demand task
+   * (onDemand=true) must clear it. This prevents a scheduled tick from 
stomping the flag set by a
+   * concurrent on-demand request whose task hasn't run yet, which would allow 
duplicate on-demand
+   * submissions to be queued.
+   */
+  @Test
+  public void testScheduledRotationDoesNotClearOnDemandFlag() {
+    ReplicationLog activeLog = logGroup.getActiveLog();
+
+    activeLog.rotationRequested.set(true);
+    activeLog.new LogRotationTask(false).run();
+    assertTrue("Scheduled rotation must not clear rotationRequested",
+      activeLog.rotationRequested.get());
+
+    activeLog.new LogRotationTask(true).run();
+    assertFalse("On-demand rotation must clear rotationRequested",
+      activeLog.rotationRequested.get());
+  }
 }

Reply via email to