This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new af48a51a57 PHOENIX-7862 Fix size-based rotation loop (#2485)
af48a51a57 is described below
commit af48a51a573ee5f3a373a16849334b952d92e07c
Author: tkhurana <[email protected]>
AuthorDate: Wed May 20 17:43:29 2026 -0700
PHOENIX-7862 Fix size-based rotation loop (#2485)
---
.../apache/phoenix/replication/ReplicationLog.java | 39 ++++++++--
.../replication/ReplicationLogGroupTest.java | 88 ++++++++++++++++++++++
2 files changed, 120 insertions(+), 7 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index f94069d088..16e62f5bb8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -82,7 +82,8 @@ public class ReplicationLog {
// in a finally block so apply() can wait (with timeout) for a fresh writer
to be staged.
private volatile CountDownLatch rotationStagedLatch;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final AtomicBoolean rotationRequested = new AtomicBoolean(false);
+ @VisibleForTesting
+ final AtomicBoolean rotationRequested = new AtomicBoolean(false);
private final ExecutorService closeExecutor;
protected ScheduledExecutorService rotationExecutor;
// Manages the creation of the actual log file in the shard directory
@@ -168,7 +169,7 @@ public class ReplicationLog {
rotationExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder()
.setNameFormat("ReplicationLogRotation-" + logGroup.getHAGroupName() +
"-%d").setDaemon(true)
.build());
- rotationExecutor.scheduleAtFixedRate(new LogRotationTask(), initialDelay,
rotationTimeMs,
+ rotationExecutor.scheduleAtFixedRate(new LogRotationTask(false),
initialDelay, rotationTimeMs,
TimeUnit.MILLISECONDS);
LOG.info("Started rotation executor with initial delay {}ms and interval
{}ms", initialDelay,
rotationTimeMs);
@@ -244,7 +245,7 @@ public class ReplicationLog {
private void requestRotation() {
if (rotationRequested.compareAndSet(false, true)) {
try {
- rotationExecutor.execute(new LogRotationTask());
+ rotationExecutor.execute(new LogRotationTask(true));
} catch (java.util.concurrent.RejectedExecutionException e) {
LOG.info("Rotation executor shut down, skipping on-demand rotation",
e);
rotationRequested.set(false);
@@ -256,6 +257,9 @@ public class ReplicationLog {
* Requests an on-demand rotation if the current writer exceeds the size
threshold.
*/
private void requestRotationIfNeeded() throws IOException {
+ if (isClosed() || rotationRequested.get()) {
+ return;
+ }
if (shouldRotateForSize()) {
requestRotation();
}
@@ -316,7 +320,6 @@ public class ReplicationLog {
generation = currentWriter.getGeneration();
}
action.action(currentWriter);
- requestRotationIfNeeded();
break;
} catch (IOException e) {
LOG.debug("Attempt {}/{} failed", attempt, maxAttempts, e);
@@ -348,6 +351,11 @@ public class ReplicationLog {
if (blockSynced[0]) {
// The block-full sync included this record — all records up to here are
durable
currentBatch.clear();
+ // Check size only after currentBatch is empty: if a rotation stages a
new writer mid-batch,
+ // unsynced records are replayed into it and could exceed the threshold
immediately,
+ // triggering another rotation and forming a loop. Checking only when
the batch is empty
+ // ensures size-based rotations don't cascade.
+ requestRotationIfNeeded();
}
}
@@ -358,6 +366,9 @@ public class ReplicationLog {
protected void sync() throws IOException {
apply(LogFileWriter::sync);
currentBatch.clear();
+ // See note in append(Record): size check runs only when currentBatch is
empty so a replay
+ // into a freshly-staged writer cannot trigger a cascading rotation.
+ requestRotationIfNeeded();
}
/**
@@ -398,7 +409,7 @@ public class ReplicationLog {
@VisibleForTesting
protected void forceRotation() {
- new LogRotationTask().run();
+ new LogRotationTask(false).run();
checkAndReplaceWriter(false);
}
@@ -410,15 +421,24 @@ public class ReplicationLog {
* Creates a new writer on a background thread and stages it in {@code
pendingWriter} for the
* consumer thread to drain inside {@link #apply}. Invoked both by the
scheduled rotation executor
* at round boundaries and on-demand via {@link #requestRotation()}.
+ * <p>
+ * Only the on-demand path owns {@code rotationRequested} — it sets the flag
before submitting the
+ * task and clears it in {@code finally}. Scheduled rotations must not touch
the flag, otherwise
+ * they could clear a flag set by a concurrent on-demand request whose task
hasn't run yet,
+ * allowing duplicate on-demand submissions to be queued.
*/
protected class LogRotationTask implements Runnable {
+ private final boolean onDemand;
+
+ LogRotationTask(boolean onDemand) {
+ this.onDemand = onDemand;
+ }
+
@Override
public void run() {
if (closed.get()) {
return;
}
- rotationRequested.compareAndSet(true, false);
-
try {
LogFileWriter newWriter = createNewWriter();
LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
@@ -439,6 +459,11 @@ public class ReplicationLog {
numFailures, maxRotationRetries, t);
}
} finally {
+ if (onDemand) {
+ // Clear the flag last so requestRotation()'s CAS rejects duplicate
on-demand
+ // submissions while this task is still creating/staging a writer.
+ rotationRequested.set(false);
+ }
CountDownLatch latch = rotationStagedLatch;
if (latch != null) {
latch.countDown();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 3fd7702e6a..55bce05d74 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -46,6 +46,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
@@ -1907,4 +1908,91 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Replay should be bounded by last partial block, not all
records. Got: "
+ newWriterAppendCount, newWriterAppendCount < id);
}
+
+ /**
+ * Tests that the size check is not invoked from inside apply(). Pre-fix,
every successful action
+ * inside apply() ran requestRotationIfNeeded(); when a rotation swapped in
a new writer mid-batch
+ * and replay made it exceed the threshold, the size check after every
append would request
+ * another rotation, drain it on the next apply(), replay again, and loop
indefinitely.
+ * <p>
+ * The new writer is mocked so its getLength() always reports above the
rotation threshold. On the
+ * buggy code this triggers a rotation request after every successful
append, which cascades into
+ * one writer per append. With the fix, requestRotationIfNeeded() runs only
after currentBatch is
+ * cleared, so the chain is bounded.
+ */
+ @Test
+ public void testSizeRotationDoesNotLoopOnReplay() throws Exception {
+ final String tableName = "TBLSRDLOR";
+ long commitId = 1L;
+ // Long enough that only the first scheduled tick can fire within the test
window.
+ final int roundDurationSeconds = 1;
+ final int numAppendsAfterTick = 20;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Count writers created so we can detect a rotation chain. Each writer
created after the
+ // initial one is configured to report a length above the rotation
threshold so the size
+ // check (wherever it runs) sees an over-threshold writer.
+ AtomicInteger writersCreated = new AtomicInteger(1); // initial writer
already created
+ final long oversizedLength = TEST_ROTATION_SIZE_BYTES + 1;
+ doAnswer(invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ doReturn(oversizedLength).when(w).getLength();
+ writersCreated.incrementAndGet();
+ return w;
+ }).when(activeLog).createNewWriter();
+
+ // Append unsynced records so currentBatch is non-empty when the rotation
tick fires.
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ for (int i = 0; i < 5; i++) {
+ logGroup.append(tableName, commitId++, put);
+ }
+
+ // Wait for the time-based rotation tick to stage a fresh pendingWriter.
+ waitForRotationTick(roundDurationSeconds);
+
+ // Continue appending. On the buggy code, the first such append drains the
staged writer,
+ // replays the batch, then requestRotationIfNeeded() inside apply() fires
(length is over
+ // threshold) → requests another rotation. Subsequent appends drain that
one too, replay,
+ // request another, and so on — one new writer per append.
+ for (int i = 0; i < numAppendsAfterTick; i++) {
+ logGroup.append(tableName, commitId++, put);
+ }
+ logGroup.sync();
+
+ // Without the fix the count grows roughly linearly with
numAppendsAfterTick (one new
+ // writer per append). With the fix, it is bounded regardless of how many
appends are
+ // performed: initial + scheduled ticks (test runtime ~2s with 1s round =
1-2 ticks) +
+ // at most one size-triggered rotation per sync point. Use a generous
bound that is well
+ // below the buggy behavior (which produced ~24) but tolerates extra
scheduled ticks.
+ int writers = writersCreated.get();
+ assertTrue("Size-based rotation must not loop. Writers created: " +
writers,
+ writers <= numAppendsAfterTick / 2);
+ }
+
+ /**
+ * Verifies the contract of {@code LogRotationTask}'s {@code onDemand} flag:
a scheduled task
+ * (onDemand=false) must not clear {@code rotationRequested}, and an
on-demand task
+ * (onDemand=true) must clear it. This prevents a scheduled tick from
stomping the flag set by a
+ * concurrent on-demand request whose task hasn't run yet, which would allow
duplicate on-demand
+ * submissions to be queued.
+ */
+ @Test
+ public void testScheduledRotationDoesNotClearOnDemandFlag() {
+ ReplicationLog activeLog = logGroup.getActiveLog();
+
+ activeLog.rotationRequested.set(true);
+ activeLog.new LogRotationTask(false).run();
+ assertTrue("Scheduled rotation must not clear rotationRequested",
+ activeLog.rotationRequested.get());
+
+ activeLog.new LogRotationTask(true).run();
+ assertFalse("On-demand rotation must clear rotationRequested",
+ activeLog.rotationRequested.get());
+ }
}