This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 9cd146ae2f PHOENIX-7864 Wake idle consumer on rotation via swap event
(#2487)
9cd146ae2f is described below
commit 9cd146ae2fd07c9552048a2439a916026bfffc3d
Author: tkhurana <[email protected]>
AuthorDate: Thu May 21 16:41:26 2026 -0700
PHOENIX-7864 Wake idle consumer on rotation via swap event (#2487)
When the producer is idle, a scheduled rotation stages a new writer in
pendingWriter but the consumer never reaches checkAndReplaceWriter()
since that runs only on DATA/SYNC events. The reader's round-buffer
interval can then expire and trigger HDFS lease recovery on the
still-open writer.
Add EVENT_TYPE_SWAP, published non-blocking by LogRotationTask after
staging. The handler swaps and unifies endOfBatch handling so a SWAP
ending a batch does not strand pending sync futures.
---
.../apache/phoenix/replication/ReplicationLog.java | 7 ++
.../phoenix/replication/ReplicationLogGroup.java | 44 +++++++--
.../phoenix/replication/ReplicationModeImpl.java | 3 -
.../replication/ReplicationLogGroupTest.java | 104 +++++++++++++++++++--
4 files changed, 139 insertions(+), 19 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 16e62f5bb8..44d4972e75 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -439,9 +439,11 @@ public class ReplicationLog {
if (closed.get()) {
return;
}
+ boolean staged = false;
try {
LogFileWriter newWriter = createNewWriter();
LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
+ staged = true;
if (undrained != null) {
closeWriter(undrained);
}
@@ -469,6 +471,11 @@ public class ReplicationLog {
latch.countDown();
rotationStagedLatch = null;
}
+ if (staged) {
+ // Wake an idle consumer so it drains pendingWriter before the
reader's round buffer
+ // expires. Non-blocking — see ReplicationLogGroup#publishSwapEvent.
+ logGroup.publishSwapEvent();
+ }
}
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index b3dc9f7f00..e7e5a69a3b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.replication;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import static
org.apache.phoenix.replication.ReplicationLogGroup.LogEvent.EVENT_TYPE_DATA;
+import static
org.apache.phoenix.replication.ReplicationLogGroup.LogEvent.EVENT_TYPE_SWAP;
import static
org.apache.phoenix.replication.ReplicationLogGroup.LogEvent.EVENT_TYPE_SYNC;
import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.INIT;
import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
@@ -29,6 +30,7 @@ import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
@@ -314,6 +316,7 @@ public class ReplicationLogGroup {
public static final byte EVENT_TYPE_DATA = 0;
public static final byte EVENT_TYPE_SYNC = 1;
+ public static final byte EVENT_TYPE_SWAP = 2;
public void setValues(int type, Record record, CompletableFuture<Void>
syncFuture) {
this.type = type;
@@ -638,6 +641,29 @@ public class ReplicationLogGroup {
}
}
+ /**
+ * Publish a non-blocking swap marker so an idle consumer wakes up and runs
+ * {@link ReplicationLog#checkAndReplaceWriter(boolean)} promptly. Called by
+ * {@link ReplicationLog.LogRotationTask} after staging a new pending writer.
+ * <p>
+ * Uses {@code tryNext()} so a full ring buffer never blocks the rotation
thread — when the buffer
+ * is full the consumer is actively draining and will hit {@code
checkAndReplaceWriter} on its
+ * own.
+ */
+ protected void publishSwapEvent() {
+ try {
+ long sequence = ringBuffer.tryNext();
+ try {
+ LogEvent event = ringBuffer.get(sequence);
+ event.setValues(EVENT_TYPE_SWAP, null, null);
+ } finally {
+ ringBuffer.publish(sequence);
+ }
+ } catch (InsufficientCapacityException e) {
+ LOG.debug("HAGroup {} ring buffer full, skipping swap event publish",
this);
+ }
+ }
+
/**
* Check if this ReplicationLogGroup is closed.
* @return true if closed, false otherwise
@@ -1128,19 +1154,21 @@ public class ReplicationLogGroup {
switch (event.type) {
case EVENT_TYPE_DATA:
currentModeImpl.append(event.record);
- if (endOfBatch) {
- processPendingSyncs(sequence);
- }
- return;
+ break;
case EVENT_TYPE_SYNC:
pendingSyncFutures.add(event.syncFuture);
- if (endOfBatch) {
- processPendingSyncs(sequence);
- }
- return;
+ break;
+ case EVENT_TYPE_SWAP:
+ // Wake-up marker from LogRotationTask. Drain the staged writer so
the old writer is
+ // closed before the reader's round buffer expires. No append/sync
to perform.
+ currentModeImpl.getReplicationLog().checkAndReplaceWriter(true);
+ break;
default:
throw new UnsupportedOperationException("Unknown event type: " +
event.type);
}
+ if (endOfBatch) {
+ processPendingSyncs(sequence);
+ }
} catch (IOException e) {
try {
LOG.info("Failed to process event at sequence {} on mode {}",
sequence, currentModeImpl,
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
index 114e519b42..4c5b9e5fbe 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
@@ -23,8 +23,6 @@ import
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-
/**
* Base class for different replication modes.
* <p>
@@ -69,7 +67,6 @@ public abstract class ReplicationModeImpl {
}
/** Returns the underlying log abstraction */
- @VisibleForTesting
ReplicationLog getReplicationLog() {
return log;
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 55bce05d74..c204cd0a27 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -1267,6 +1267,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
LogFileWriter writer = activeLog.getWriter();
assertNotNull("Writer should not be null", writer);
+ // Configure all stubs before publishing any events. Stubbing a Mockito
spy while the
+ // consumer thread is concurrently invoking methods on the same spy is
racy — Mockito's
+ // invocation/stub state is not thread-safe and the partially-applied stub
can be matched
+ // against an unrelated method on the consumer thread.
+ doThrow(new IOException("Simulate append
failure")).when(writer).append(tableName, commitId5,
+ put5);
// Rotated writers must also fail on the 5th append so the retry doesn't
rescue the loop.
doAnswer(invocation -> {
LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
@@ -1279,11 +1285,6 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId2, put2);
logGroup.append(tableName, commitId3, put3);
logGroup.append(tableName, commitId4, put4);
-
- // configure initial writer to throw IOException on the 5th append
- doThrow(new IOException("Simulate append
failure")).when(writer).append(tableName, commitId5,
- put5);
-
logGroup.append(tableName, commitId5, put5);
logGroup.sync();
@@ -1393,12 +1394,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
LogFileWriter initialWriter = activeLog.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
+ // Configure initial writer's sync to fail (simulating broken stream after
lease recovery).
+ // Done before the append publishes anything so the consumer thread cannot
race with this
+ // stub installation on the same Mockito spy.
+ doThrow(new IOException("Simulated broken
stream")).when(initialWriter).sync();
+
// Append a record — goes to initialWriter
logGroup.append(tableName, commitId, put);
- // Configure initial writer's sync to fail (simulating broken stream after
lease recovery)
- doThrow(new IOException("Simulated broken
stream")).when(initialWriter).sync();
-
// Stage a pending writer via forced rotation
activeLog.forceRotation();
@@ -1995,4 +1998,89 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertFalse("On-demand rotation must clear rotationRequested",
activeLog.rotationRequested.get());
}
+
+ /**
+ * Tests that a rotation tick wakes an idle consumer via the synthetic swap
event so the staged
+ * pendingWriter is drained without waiting for the next real append/sync.
Prior to this fix the
+ * undrained writer remained installed until the next data event, which on
an idle system could
+ * exceed the reader's round buffer and trigger lease recovery.
+ */
+ @Test
+ public void testIdleConsumerSwapsOnRotation() throws Exception {
+ final String tableName = "TBLICSOR";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+ final int roundDurationSeconds = 2;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Establish baseline: append + sync to clear currentBatch
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
+
+ // Wait past one rotation tick. With the synthetic swap event, the
consumer wakes and drains
+ // pendingWriter — no further append/sync needed.
+ waitForRotationTick(roundDurationSeconds);
+
+ // The active writer should now be the one staged by the rotation tick.
+ LogFileWriter writerAfterIdleRotation = activeLog.getWriter();
+ assertTrue("Writer should have swapped after idle rotation tick without
further events",
+ writerAfterIdleRotation != initialWriter);
+
+ // The initial writer should have been closed asynchronously by the swap.
+ verify(initialWriter, timeout(5000).times(1)).close();
+ }
+
+ /**
+ * Verifies that {@code publishSwapEvent} silently skips when the ring
buffer is full — the
+ * consumer is actively draining and will hit {@code checkAndReplaceWriter}
on its next event.
+ */
+ @Test
+ public void testPublishSwapEventOnFullRingBufferIsNoop() throws Exception {
+ final String tableName = "TBLPSEFRB";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ LogFileWriter innerWriter = logGroup.getActiveLog().getWriter();
+ assertNotNull("Inner writer should not be null", innerWriter);
+
+ // Hold the consumer so the ring buffer cannot drain
+ final CountDownLatch holdConsumer = new CountDownLatch(1);
+ final CountDownLatch ringFull = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ holdConsumer.await();
+ return invocation.callRealMethod();
+ }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+
+ Thread filler = new Thread(() -> {
+ try {
+ long id = 0;
+ // Fill all ring buffer slots. The consumer is blocked on the first
event so its
+ // gating sequence never advances, and these N publishes saturate the
buffer.
+ for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
+ logGroup.append(tableName, id++, put);
+ }
+ ringFull.countDown();
+ // The next append will block until the consumer drains.
+ logGroup.append(tableName, id, put);
+ } catch (Exception ignored) {
+ }
+ });
+ filler.setDaemon(true);
+ filler.start();
+
+ // Wait until the filler has saturated the ring buffer.
+ ringFull.await();
+
+ // Should not throw / hang even though the ring buffer is full.
+ logGroup.publishSwapEvent();
+
+ // Release the consumer so the filler thread can finish and tearDown can
close cleanly.
+ holdConsumer.countDown();
+ filler.join(5000);
+ }
}