This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 4a08b1d10d PHOENIX-7862 addendum Fix the race which caused double
rotation
4a08b1d10d is described below
commit 4a08b1d10d3bfeb46618c3b9fba3f687ffc5c0cd
Author: tkhurana <[email protected]>
AuthorDate: Tue Jun 2 17:50:01 2026 -0700
PHOENIX-7862 addendum Fix the race which caused double rotation
---
.../apache/phoenix/replication/ReplicationLog.java | 61 +++++++++++-----------
.../replication/ReplicationLogGroupTest.java | 36 +++++--------
2 files changed, 45 insertions(+), 52 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 410bd17ddb..321c9fbd2f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -79,10 +79,15 @@ public class ReplicationLog {
protected final AtomicLong rotationFailures = new AtomicLong(0);
// Staged writer created by the background LogRotationTask, drained by
checkAndReplaceWriter().
private final AtomicReference<LogFileWriter> pendingWriter = new
AtomicReference<>();
- // Latch set by apply() before requesting an on-demand rotation; counted
down by LogRotationTask
- // in a finally block so apply() can wait (with timeout) for a fresh writer
to be staged.
+ // Latch set by apply() on the retry path before calling requestRotation();
counted down by
+ // LogRotationTask in a finally block so apply() can wait (with timeout) for
a fresh writer.
private volatile CountDownLatch rotationStagedLatch;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ // Single gate for rotation submission. Set by requestRotation()'s CAS
before queuing a task,
+ // cleared in LogRotationTask's finally. Both scheduled ticks and on-demand
callers go through
+ // requestRotation(), so a request that arrives while a rotation is queued
or running is a
+ // no-op — preventing the duplicate-writer bug where an in-flight scheduled
rotation and a
+ // concurrent size-triggered request both stage writers and the second
closes the first.
@VisibleForTesting
final AtomicBoolean rotationRequested = new AtomicBoolean(false);
private final ExecutorService closeExecutor;
@@ -166,7 +171,9 @@ public class ReplicationLog {
rotationExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder()
.setNameFormat("ReplicationLogRotation-" + logGroup.getHAGroupName() +
"-%d").setDaemon(true)
.build());
- rotationExecutor.scheduleAtFixedRate(new LogRotationTask(false),
initialDelay, rotationTimeMs,
+ // Scheduled ticks route through requestRotation() so they share the same
CAS gate as on-demand
+ // size-triggered rotations — only one rotation can be queued or running
at a time.
+ rotationExecutor.scheduleAtFixedRate(this::requestRotation, initialDelay,
rotationTimeMs,
TimeUnit.MILLISECONDS);
LOG.info("Started rotation executor with initial delay {}ms and interval
{}ms", initialDelay,
rotationTimeMs);
@@ -236,25 +243,29 @@ public class ReplicationLog {
}
/**
- * Submits an on-demand {@link LogRotationTask} to the executor. The
compareAndSet avoids
- * submitting duplicate tasks — if the flag is already set, a task is
already queued.
+ * Submits a {@link LogRotationTask} to the executor. The CAS gate ensures
only one rotation can
+ * be queued or in flight at a time — if the flag is already set, a task is
already pending or
+ * running. Both scheduled ticks and on-demand size-triggered callers go
through this method.
*/
private void requestRotation() {
if (rotationRequested.compareAndSet(false, true)) {
try {
- rotationExecutor.execute(new LogRotationTask(true));
+ rotationExecutor.execute(new LogRotationTask());
} catch (java.util.concurrent.RejectedExecutionException e) {
- LOG.info("Rotation executor shut down, skipping on-demand rotation",
e);
+ LOG.info("Rotation executor shut down, skipping rotation", e);
rotationRequested.set(false);
}
}
}
/**
- * Requests an on-demand rotation if the current writer exceeds the size
threshold.
+ * Requests a rotation if the current writer exceeds the size threshold.
Skips when a writer is
+ * already staged but not yet drained — the consumer hasn't swapped in the
new writer yet, so
+ * {@code currentWriter} still reports the old (over-threshold) length and a
fresh request would
+ * stage a redundant writer that the next task immediately closes.
*/
- private void requestRotationIfNeeded() throws IOException {
- if (isClosed() || rotationRequested.get()) {
+ private void requestRotationIfOversized() throws IOException {
+ if (isClosed() || rotationRequested.get() || pendingWriter.get() != null) {
return;
}
if (shouldRotateForSize()) {
@@ -352,7 +363,7 @@ public class ReplicationLog {
// unsynced records are replayed into it and could exceed the threshold
immediately,
// triggering another rotation and forming a loop. Checking only when
the batch is empty
// ensures size-based rotations don't cascade.
- requestRotationIfNeeded();
+ requestRotationIfOversized();
}
}
@@ -365,7 +376,7 @@ public class ReplicationLog {
currentBatch.clear();
// See note in append(Record): size check runs only when currentBatch is
empty so a replay
// into a freshly-staged writer cannot trigger a cascading rotation.
- requestRotationIfNeeded();
+ requestRotationIfOversized();
}
/**
@@ -406,7 +417,7 @@ public class ReplicationLog {
@VisibleForTesting
protected void forceRotation() {
- new LogRotationTask(false).run();
+ new LogRotationTask().run();
checkAndReplaceWriter(false);
}
@@ -416,21 +427,14 @@ public class ReplicationLog {
/**
* Creates a new writer on a background thread and stages it in {@code
pendingWriter} for the
- * consumer thread to drain inside {@link #apply}. Invoked both by the
scheduled rotation executor
- * at round boundaries and on-demand via {@link #requestRotation()}.
+ * consumer thread to drain inside {@link #apply}. Submitted via {@link
#requestRotation()} from
+ * both the scheduled tick and on-demand size-triggered callers.
* <p>
- * Only the on-demand path owns {@code rotationRequested} — it sets the flag
before submitting the
- * task and clears it in {@code finally}. Scheduled rotations must not touch
the flag, otherwise
- * they could clear a flag set by a concurrent on-demand request whose task
hasn't run yet,
- * allowing duplicate on-demand submissions to be queued.
+ * {@code rotationRequested} stays set for the entire duration of run()
(cleared in finally), so
+ * any request that arrives while this task is creating/staging a writer is
rejected by
+ * {@link #requestRotation()}'s CAS and not duplicated.
*/
protected class LogRotationTask implements Runnable {
- private final boolean onDemand;
-
- LogRotationTask(boolean onDemand) {
- this.onDemand = onDemand;
- }
-
@Override
public void run() {
if (closed.get()) {
@@ -461,11 +465,8 @@ public class ReplicationLog {
} finally {
// Time both success and failure paths so slow rotations are visible
even when they fail.
logGroup.getMetrics().updateRotationTime(System.nanoTime() - startNs);
- if (onDemand) {
- // Clear the flag last so requestRotation()'s CAS rejects duplicate
on-demand
- // submissions while this task is still creating/staging a writer.
- rotationRequested.set(false);
- }
+ // Clear last so requestRotation()'s CAS suppresses duplicates
throughout this run.
+ rotationRequested.set(false);
CountDownLatch latch = rotationStagedLatch;
if (latch != null) {
latch.countDown();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index c204cd0a27..349fdb8620 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -392,8 +392,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
/**
* Tests size-based log rotation. Verifies that the log file is rotated when
it exceeds the
* configured size threshold and that operations continue correctly with the
new log file. After
- * the first sync, requestRotationIfNeeded submits an on-demand
LogRotationTask which creates the
- * new writer immediately on the executor thread.
+ * the first sync, requestRotationIfOversized submits an on-demand
LogRotationTask which creates
+ * the new writer immediately on the executor thread.
*/
@Test
public void testSizeBasedRotation() throws Exception {
@@ -409,7 +409,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
for (int i = 0; i < 100; i++) {
logGroup.append(tableName, commitId++, put);
}
- // Sync: data goes to old writer. requestRotationIfNeeded submits
on-demand rotation task.
+ // Sync: data goes to old writer. requestRotationIfOversized submits
on-demand rotation task.
logGroup.sync();
// Wait for the on-demand rotation task to create a new writer on the
background thread
@@ -1914,14 +1914,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
/**
* Tests that the size check is not invoked from inside apply(). Pre-fix,
every successful action
- * inside apply() ran requestRotationIfNeeded(); when a rotation swapped in
a new writer mid-batch
- * and replay made it exceed the threshold, the size check after every
append would request
- * another rotation, drain it on the next apply(), replay again, and loop
indefinitely.
+ * inside apply() ran requestRotationIfOversized(); when a rotation swapped
in a new writer
+ * mid-batch and replay made it exceed the threshold, the size check after
every append would
+ * request another rotation, drain it on the next apply(), replay again, and
loop indefinitely.
* <p>
* The new writer is mocked so its getLength() always reports above the
rotation threshold. On the
* buggy code this triggers a rotation request after every successful
append, which cascades into
- * one writer per append. With the fix, requestRotationIfNeeded() runs only
after currentBatch is
- * cleared, so the chain is bounded.
+ * one writer per append. With the fix, requestRotationIfOversized() runs
only after currentBatch
+ * is cleared, so the chain is bounded.
*/
@Test
public void testSizeRotationDoesNotLoopOnReplay() throws Exception {
@@ -1960,7 +1960,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
waitForRotationTick(roundDurationSeconds);
// Continue appending. On the buggy code, the first such append drains the
staged writer,
- // replays the batch, then requestRotationIfNeeded() inside apply() fires
(length is over
+ // replays the batch, then requestRotationIfOversized() inside apply()
fires (length is over
// threshold) → requests another rotation. Subsequent appends drain that
one too, replay,
// request another, and so on — one new writer per append.
for (int i = 0; i < numAppendsAfterTick; i++) {
@@ -1979,24 +1979,16 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
}
/**
- * Verifies the contract of {@code LogRotationTask}'s {@code onDemand} flag:
a scheduled task
- * (onDemand=false) must not clear {@code rotationRequested}, and an
on-demand task
- * (onDemand=true) must clear it. This prevents a scheduled tick from
stomping the flag set by a
- * concurrent on-demand request whose task hasn't run yet, which would allow
duplicate on-demand
- * submissions to be queued.
+ * Both scheduled ticks and on-demand callers go through {@code
requestRotation()}, so a task
+ * always owns {@code rotationRequested} and clears it in finally.
*/
@Test
- public void testScheduledRotationDoesNotClearOnDemandFlag() {
+ public void testRotationTaskClearsRequestedFlag() {
ReplicationLog activeLog = logGroup.getActiveLog();
activeLog.rotationRequested.set(true);
- activeLog.new LogRotationTask(false).run();
- assertTrue("Scheduled rotation must not clear rotationRequested",
- activeLog.rotationRequested.get());
-
- activeLog.new LogRotationTask(true).run();
- assertFalse("On-demand rotation must clear rotationRequested",
- activeLog.rotationRequested.get());
+ activeLog.new LogRotationTask().run();
+ assertFalse("LogRotationTask must clear rotationRequested",
activeLog.rotationRequested.get());
}
/**