This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 2fcf0cc985 PHOENIX-7793 Replication Log writer improvements (#2433)
2fcf0cc985 is described below
commit 2fcf0cc9857845306e300c1a73c5330c106df9e5
Author: tkhurana <[email protected]>
AuthorDate: Thu Apr 30 10:56:14 2026 -0700
PHOENIX-7793 Replication Log writer improvements (#2433)
---
.../apache/phoenix/replication/ReplicationLog.java | 360 ++++++-----
.../phoenix/replication/ReplicationLogGroup.java | 1 +
.../replication/log/LogFileFormatWriter.java | 5 +-
.../phoenix/replication/log/LogFileWriter.java | 6 +-
.../metrics/MetricsReplicationLogGroupSource.java | 27 -
.../MetricsReplicationLogGroupSourceImpl.java | 30 +-
.../metrics/ReplicationLogMetricValues.java | 21 +-
.../replication/ReplicationLogBaseTest.java | 41 +-
.../ReplicationLogDiscoveryForwarderTest.java | 22 +-
.../replication/ReplicationLogGroupTest.java | 655 ++++++++++++++++-----
.../phoenix/replication/ReplicationLogTest.java | 66 +++
11 files changed, 784 insertions(+), 450 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index d23008b9fb..b699981c24 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -22,11 +22,13 @@ import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,6 +41,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -48,6 +51,13 @@ import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFac
* based on time or size, retries on file operations and the lifecycle of the
underlying
* LogFileWriter.
* </p>
+ * <p>
+ * All rotation (scheduled round-boundary ticks, on-demand size, and error
recovery) is handled by
+ * {@link LogRotationTask} which creates a new writer on a background thread
and stages it in
+ * {@code pendingWriter}. The disruptor consumer thread drains the staged
writer inside
+ * {@link #apply} before each attempt, swaps the pointer, replays any unsynced
appends, and submits
+ * the old writer to {@code closeExecutor} for async close.
+ * </p>
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2", "MS_EXPOSE_REP" },
justification = "Intentional")
@@ -59,22 +69,24 @@ public class ReplicationLog {
protected final long rotationSizeBytes;
protected final int maxRotationRetries;
protected final Compression.Algorithm compression;
- protected final ReentrantLock lock = new ReentrantLock();
- protected final int maxAttempts; // Configurable max attempts for sync
- protected final long retryDelayMs; // Configurable delay between attempts
- // Underlying file writer
+ protected final int maxAttempts;
+ protected final long retryDelayMs;
+ // Underlying file writer — only mutated by the disruptor consumer thread.
protected volatile LogFileWriter currentWriter;
- protected final AtomicLong lastRotationTime = new AtomicLong();
protected final AtomicLong writerGeneration = new AtomicLong();
protected final AtomicLong rotationFailures = new AtomicLong(0);
+ // Staged writer created by the background LogRotationTask, drained by
checkAndReplaceWriter().
+ private final AtomicReference<LogFileWriter> pendingWriter = new
AtomicReference<>();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean rotationRequested = new AtomicBoolean(false);
+ private final ExecutorService closeExecutor;
protected ScheduledExecutorService rotationExecutor;
- protected volatile boolean closed = false;
// Manages the creation of the actual log file in the shard directory
protected ReplicationShardDirectoryManager replicationShardDirectoryManager;
// List of in-flight appends which are successful but haven't been synced yet
private final List<Record> currentBatch = new ArrayList<>();
- // Current version of the writer being used for file operations. It is
needed for detecting
- // when the writer changes because of rotation while we are in the middle of
a write operation.
+ // Tracks the generation of the writer that currentBatch was appended to.
+ // Used to detect writer swaps and trigger replay of unsynced appends.
private long generation;
public ReplicationLog(ReplicationLogGroup logGroup,
@@ -85,8 +97,7 @@ public class ReplicationLog {
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_RETRIES) + 1;
this.retryDelayMs =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_RETRY_DELAY_MS_KEY,
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
- this.rotationTimeMs =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
- ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+ this.rotationTimeMs = shardManager.getReplicationRoundDurationSeconds() *
1000L;
long rotationSize =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
double rotationSizePercent =
@@ -104,29 +115,18 @@ public class ReplicationLog {
try {
compression =
Compression.getCompressionAlgorithmByName(compressionName);
} catch (IllegalArgumentException e) {
- LOG.warn("Unknown compression type " + compressionName + ", using
NONE", e);
+ LOG.warn("Unknown compression type {}, using NONE", compressionName,
e);
}
}
this.compression = compression;
this.replicationShardDirectoryManager = shardManager;
- }
-
- /** The reason for requesting a log rotation. */
- protected enum RotationReason {
- /** Rotation requested due to time threshold being exceeded. */
- TIME,
- /** Rotation requested due to size threshold being exceeded. */
- SIZE,
- /** Rotation requested due to an error condition. */
- ERROR
+ this.closeExecutor = Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Close-ReplicationLog-Writer-" +
logGroup.getHAGroupName() + "-%d").build());
}
/** Initialize the writer. */
public void init() throws IOException {
- // Start time based rotation.
- lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
startRotationExecutor();
- // Create the initial writer
currentWriter = createNewWriter();
generation = currentWriter.getGeneration();
}
@@ -146,18 +146,25 @@ public class ReplicationLog {
}
protected void startRotationExecutor() {
- long rotationCheckInterval = getRotationCheckInterval(rotationTimeMs);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long currentRoundStart =
replicationShardDirectoryManager.getNearestRoundStartTimestamp(now);
+ long initialDelay = computeInitialDelay(now, currentRoundStart,
rotationTimeMs);
+ startRotationExecutor(initialDelay);
+ }
+
+ protected void startRotationExecutor(long initialDelay) {
rotationExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder()
.setNameFormat("ReplicationLogRotation-" + logGroup.getHAGroupName() +
"-%d").setDaemon(true)
.build());
- rotationExecutor.scheduleAtFixedRate(new LogRotationTask(),
rotationCheckInterval,
- rotationCheckInterval, TimeUnit.MILLISECONDS);
- LOG.debug("Started rotation executor with interval {}ms",
rotationCheckInterval);
+ rotationExecutor.scheduleAtFixedRate(new LogRotationTask(), initialDelay,
rotationTimeMs,
+ TimeUnit.MILLISECONDS);
+ LOG.info("Started rotation executor with initial delay {}ms and interval
{}ms", initialDelay,
+ rotationTimeMs);
}
- protected long getRotationCheckInterval(long rotationTimeMs) {
- long interval = Math.max(10 * 1000L, Math.min(60 * 1000L, rotationTimeMs /
10));
- return interval;
+ @VisibleForTesting
+ static long computeInitialDelay(long now, long currentRoundStart, long
rotationTimeMs) {
+ return currentRoundStart + rotationTimeMs - now;
}
protected void stopRotationExecutor() {
@@ -174,114 +181,79 @@ public class ReplicationLog {
}
}
- /** Gets the current writer, rotating it if necessary based on size
thresholds. */
- protected LogFileWriter getWriter() throws IOException {
- lock.lock();
- try {
- if (shouldRotate()) {
- rotateLog(RotationReason.SIZE);
- }
- return currentWriter;
- } finally {
- lock.unlock();
- }
- }
-
/**
- * Checks if the current log file needs to be rotated based on time or size.
Must be called under
- * lock.
+ * Checks if the current log file needs to be rotated based on size.
* @return true if rotation is needed, false otherwise.
* @throws IOException If an error occurs checking the file size.
*/
- protected boolean shouldRotate() throws IOException {
+ protected boolean shouldRotateForSize() throws IOException {
if (currentWriter == null) {
LOG.warn("Current writer is null, forcing rotation.");
return true;
}
- // Check time threshold
- long now = EnvironmentEdgeManager.currentTimeMillis();
- long last = lastRotationTime.get();
- if (now - last >= rotationTimeMs) {
- LOG.debug("Rotating log file due to time threshold ({} ms elapsed,
threshold {} ms)",
- now - last, rotationTimeMs);
- return true;
- }
-
- // Check size threshold (using actual file size for accuracy)
long currentSize = currentWriter.getLength();
if (currentSize >= rotationSizeBytes) {
- LOG.debug("Rotating log file due to size threshold ({} bytes, threshold
{} bytes)",
- currentSize, rotationSizeBytes);
+ LOG.info("Rotating log file {} due to size threshold ({} bytes,
threshold {} bytes)",
+ currentWriter, currentSize, rotationSizeBytes);
return true;
}
-
return false;
}
+ @VisibleForTesting
+ protected LogFileWriter getWriter() {
+ checkAndReplaceWriter(false);
+ return currentWriter;
+ }
+
/**
- * Closes the current log writer and opens a new one, updating rotation
metrics.
- * <p>
- * This method handles the rotation of log files, which can be triggered by:
- * <ul>
- * <li>Time threshold exceeded (TIME)</li>
- * <li>Size threshold exceeded (SIZE)</li>
- * <li>Error condition requiring rotation (ERROR)</li>
- * </ul>
- * <p>
- * The method implements retry logic for handling rotation failures. If
rotation fails, it retries
- * up to maxRotationRetries times. If the number of failures exceeds
maxRotationRetries, an
- * exception is thrown. Otherwise, it logs a warning and continues with the
current writer.
- * <p>
- * The method is thread-safe and uses a lock to ensure atomic rotation
operations.
- * @param reason The reason for requesting log rotation
- * @return The new LogFileWriter instance if rotation succeeded, or the
current writer if rotation
- * failed
- * @throws IOException if rotation fails after exceeding maxRotationRetries
+ * Drains any staged pendingWriter, swapping it in as currentWriter.
+ * @param asyncClose if true, old writer is submitted to closeExecutor for
async close; if false,
+ * old writer is closed synchronously on the calling
thread.
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"UL_UNRELEASED_LOCK",
- justification = "False positive")
- protected LogFileWriter rotateLog(RotationReason reason) throws IOException {
- lock.lock();
- try {
- // Try to get the new writer first. If it fails we continue using the
current writer.
- // Increment the writer generation
- LogFileWriter newWriter = createNewWriter();
- LOG.debug("Created new writer: {}", newWriter);
- // Close the current writer
- closeWriter(currentWriter);
+ protected void checkAndReplaceWriter(boolean asyncClose) {
+ LogFileWriter newWriter = pendingWriter.getAndSet(null);
+ if (newWriter != null) {
+ LogFileWriter oldWriter = currentWriter;
currentWriter = newWriter;
- lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
- rotationFailures.set(0);
- logGroup.getMetrics().incrementRotationCount();
- switch (reason) {
- case TIME:
- logGroup.getMetrics().incrementTimeBasedRotationCount();
- break;
- case SIZE:
- logGroup.getMetrics().incrementSizeBasedRotationCount();
- break;
- case ERROR:
- logGroup.getMetrics().incrementErrorBasedRotationCount();
- break;
+ LOG.info("Swapped writer from {} to {}, asyncClose={}", oldWriter,
newWriter, asyncClose);
+ if (asyncClose) {
+ submitClose(oldWriter);
+ } else {
+ closeWriter(oldWriter);
}
- } catch (IOException e) {
- // If we fail to rotate the log, we increment the failure counter. If we
have exceeded
- // the maximum number of retries, we close the log and throw the
exception. Otherwise
- // we log a warning and continue.
- logGroup.getMetrics().incrementRotationFailureCount();
- long numFailures = rotationFailures.getAndIncrement();
- if (numFailures >= maxRotationRetries) {
- LOG.warn("Failed to rotate log (attempt {}/{}), closing log",
numFailures,
- maxRotationRetries, e);
- closeOnError();
- throw e;
+ }
+ }
+
+ /**
+ * Submits an on-demand {@link LogRotationTask} to the executor. The
compareAndSet avoids
+ * submitting duplicate tasks — if the flag is already set, a task is
already queued.
+ */
+ private void requestRotation() {
+ if (rotationRequested.compareAndSet(false, true)) {
+ try {
+ rotationExecutor.execute(new LogRotationTask());
+ } catch (java.util.concurrent.RejectedExecutionException e) {
+ LOG.info("Rotation executor shut down, skipping on-demand rotation",
e);
+ rotationRequested.set(false);
}
- LOG.warn("Failed to rotate log (attempt {}/{}), retrying...",
numFailures, maxRotationRetries,
- e);
- } finally {
- lock.unlock();
}
- return currentWriter;
+ }
+
+ /**
+ * Requests an on-demand rotation if the current writer exceeds the size
threshold.
+ */
+ private void requestRotationIfNeeded() throws IOException {
+ if (shouldRotateForSize()) {
+ requestRotation();
+ }
+ }
+
+ private void submitClose(LogFileWriter writer) {
+ if (writer == null) {
+ return;
+ }
+ closeExecutor.execute(() -> closeWriter(writer));
}
/** Closes the given writer, logging any errors that occur during close. */
@@ -289,12 +261,11 @@ public class ReplicationLog {
if (writer == null) {
return;
}
- LOG.debug("Closing writer: {}", writer);
+ LOG.info("Closing writer: {}", writer);
try {
writer.close();
} catch (IOException e) {
- // For now, just log and continue
- LOG.error("Error closing log writer: " + writer, e);
+ LOG.error("Error closing log writer: {}", writer, e);
}
}
@@ -303,49 +274,55 @@ public class ReplicationLog {
* @return true if closed, false otherwise
*/
public boolean isClosed() {
- return closed;
+ return closed.get();
}
private interface Action {
void action(LogFileWriter writer) throws IOException;
}
+ private void replayCurrentBatch() throws IOException {
+ if (currentBatch.isEmpty()) {
+ return;
+ }
+ LOG.info("Replaying {} unsynced records into new writer {}",
currentBatch.size(),
+ currentWriter);
+ for (Record r : currentBatch) {
+ currentWriter.append(r.tableName, r.commitId, r.mutation);
+ }
+ }
+
private void apply(Action action) throws IOException {
- LogFileWriter writer = getWriter();
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+ checkAndReplaceWriter(true);
if (isClosed()) {
throw new IOException("Closed");
}
try {
- if (writer.getGeneration() > generation) {
- generation = writer.getGeneration();
- // If the writer has been rotated, we need to replay the current
batch of
- // in-flight appends into the new writer.
- if (!currentBatch.isEmpty()) {
- LOG.trace("Writer has been rotated, replaying in-flight batch");
- for (Record r : currentBatch) {
- writer.append(r.tableName, r.commitId, r.mutation);
- }
- }
+ if (currentWriter.getGeneration() > generation) {
+ replayCurrentBatch();
+ generation = currentWriter.getGeneration();
}
- action.action(writer);
+ action.action(currentWriter);
+ requestRotationIfNeeded();
break;
} catch (IOException e) {
- // IO exception, force a rotation.
- LOG.debug("Attempt " + attempt + "/" + maxAttempts + " failed", e);
+ LOG.debug("Attempt {}/{} failed", attempt, maxAttempts, e);
if (attempt == maxAttempts) {
- // TODO: Add log
closeOnError();
throw e;
}
- // Add delay before retrying to prevent tight loops
+ // First failure retries on the same writer (transient). Second failure
+ // requests a new writer to recover from non-transient stream errors.
+ if (attempt > 1) {
+ requestRotation();
+ }
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted during retry delay");
}
- writer = rotateLog(RotationReason.ERROR);
}
}
}
@@ -361,8 +338,7 @@ public class ReplicationLog {
}
protected void sync() throws IOException {
- apply(writer -> writer.sync());
- // Sync completed, clear the list of in-flight appends.
+ apply(LogFileWriter::sync);
currentBatch.clear();
}
@@ -381,79 +357,81 @@ public class ReplicationLog {
* attempted on a log that has encountered a critical error.
*/
protected void closeOnError() {
- lock.lock();
- try {
- if (closed) {
- return;
- }
- closed = true;
- } finally {
- lock.unlock();
+ if (!closed.compareAndSet(false, true)) {
+ return;
}
- // Stop the time based rotation check.
stopRotationExecutor();
- // We expect a final sync will not work. Just close the inner writer.
+ closeExecutor.shutdownNow();
+ LogFileWriter staged = pendingWriter.getAndSet(null);
+ if (staged != null) {
+ closeWriter(staged);
+ }
closeWriter(currentWriter);
}
/** Closes the log. */
public void close() {
- lock.lock();
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ stopRotationExecutor();
+ closeExecutor.shutdown();
try {
- if (closed) {
- return;
+ if (!closeExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ closeExecutor.shutdownNow();
}
- closed = true;
- } finally {
- lock.unlock();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ closeExecutor.shutdownNow();
+ }
+ LogFileWriter staged = pendingWriter.getAndSet(null);
+ if (staged != null) {
+ closeWriter(staged);
}
- // Stop the time based rotation check.
- stopRotationExecutor();
- // We must for the disruptor before closing the current writer.
closeWriter(currentWriter);
}
+ @VisibleForTesting
+ protected void forceRotation() {
+ new LogRotationTask().run();
+ checkAndReplaceWriter(false);
+ }
+
protected FileSystem getFileSystem(URI uri) throws IOException {
return FileSystem.get(uri, logGroup.getConfiguration());
}
- /** Implements time based rotation independent of in-line checking. */
+ /**
+ * Creates a new writer on a background thread and stages it in {@code
pendingWriter} for the
+ * consumer thread to drain inside {@link #apply}. Invoked both by the
scheduled rotation executor
+ * at round boundaries and on-demand via {@link #requestRotation()}.
+ */
protected class LogRotationTask implements Runnable {
@Override
public void run() {
- if (closed) {
+ if (closed.get()) {
return;
}
- // Use tryLock with a timeout to avoid blocking indefinitely if another
thread holds
- // the lock for an unexpectedly long time (e.g., during a problematic
rotation).
- boolean acquired = false;
+ rotationRequested.compareAndSet(true, false);
+
try {
- // Wait a short time for the lock
- acquired = lock.tryLock(1, TimeUnit.SECONDS);
- if (acquired) {
- // Check only the time condition here, size is handled by getWriter
- long now = EnvironmentEdgeManager.currentTimeMillis();
- long last = lastRotationTime.get();
- if (!closed && now - last >= rotationTimeMs) {
- LOG.debug("Time based rotation needed ({} ms elapsed, threshold {}
ms).", now - last,
- rotationTimeMs);
- try {
- rotateLog(RotationReason.TIME); // rotateLog updates
lastRotationTime
- } catch (IOException e) {
- LOG.error("Failed to rotate log, currentWriter is {}",
currentWriter, e);
- // More robust error handling goes here once the
store-and-forward
- // fallback is implemented. For now we just log the error and
continue.
- }
- }
- } else {
- LOG.warn("LogRotationTask could not acquire lock, skipping check
this time.");
+ LogFileWriter newWriter = createNewWriter();
+ LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
+ if (undrained != null) {
+ closeWriter(undrained);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt(); // Preserve interrupt status
- LOG.warn("LogRotationTask interrupted while trying to acquire lock.");
- } finally {
- if (acquired) {
- lock.unlock();
+ rotationFailures.set(0);
+ logGroup.getMetrics().incrementRotationCount();
+ } catch (IOException e) {
+ logGroup.getMetrics().incrementRotationFailureCount();
+ long numFailures = rotationFailures.incrementAndGet();
+ if (numFailures >= maxRotationRetries) {
+ LOG.error("Too many rotation failures ({}/{}), closing log",
numFailures,
+ maxRotationRetries, e);
+ closeOnError();
+ } else {
+ LOG.info("Failed to create new writer for rotation (attempt {}/{}),
retrying...",
+ numFailures, maxRotationRetries, e);
}
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index 806df5f155..c69292a9a1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -412,6 +412,7 @@ public class ReplicationLogGroup {
* @throws IOException if initialization fails
*/
protected void init() throws IOException {
+ LOG.info("Initializing ReplicationLogGroup {}", haGroupName);
Optional<HAGroupStoreRecord> haRecord =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
if (!haRecord.isPresent()) {
String message =
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index be94364a80..2555f17225 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -211,9 +211,8 @@ public class LogFileFormatWriter implements Closeable {
@Override
public String toString() {
- return "LogFileFormatWriter [writerContext=" + context + ",
currentBlockUncompressedBytes="
- + currentBlockBytes + ", recordCount=" + recordCount + ", blockCount=" +
blockCount
- + ", blocksStartOffset=" + blocksStartOffset + "]";
+ return "LogFileFormatWriter [writerContext=" + context + ", recordCount="
+ recordCount
+ + ", blockCount=" + blockCount + ", blocksStartOffset=" +
blocksStartOffset + "]";
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 0c883a4469..923fc0f7bf 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -35,7 +35,7 @@ public class LogFileWriter implements LogFile.Writer {
private LogFileWriterContext context;
private LogFileFormatWriter writer;
- private boolean closed = false;
+ private volatile boolean closed = false;
/**
* A monotonically increasing sequence number that identifies this writer
instance, used to detect
* log file rotations and ensure proper handling of in-flight operations.
Higher layers will get a
@@ -124,8 +124,8 @@ public class LogFileWriter implements LogFile.Writer {
@Override
public String toString() {
- return "LogFileWriter [writerContext=" + context + ", formatWriter=" +
writer + ", closed="
- + closed + ", generation=" + generation + "]";
+ return "LogFileWriter [formatWriter=" + writer + ", closed=" + closed + ",
generation="
+ + generation + "]";
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index 798c048136..b3cc095048 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -27,15 +27,6 @@ public interface MetricsReplicationLogGroupSource extends
BaseSource {
String METRICS_DESCRIPTION = "Metrics about Replication Log Operations for
an HA Group";
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
- String TIME_BASED_ROTATION_COUNT = "timeBasedRotationCount";
- String TIME_BASED_ROTATION_COUNT_DESC = "Number of time-based log rotations";
-
- String SIZE_BASED_ROTATION_COUNT = "sizeBasedRotationCount";
- String SIZE_BASED_ROTATION_COUNT_DESC = "Number of size-based log rotations";
-
- String ERROR_BASED_ROTATION_COUNT = "errorBasedRotationCount";
- String ERROR_BASED_ROTATION_COUNT_DESC = "Number of times rotateLog was
called due to errors";
-
String ROTATION_COUNT = "rotationCount";
String ROTATION_COUNT_DESC = "Total number of times rotateLog was called";
@@ -54,24 +45,6 @@ public interface MetricsReplicationLogGroupSource extends
BaseSource {
String RING_BUFFER_TIME = "ringBufferTime";
String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
- /**
- * Increments the counter for time-based log rotations. This counter tracks
the number of times
- * the log was rotated due to time threshold.
- */
- void incrementTimeBasedRotationCount();
-
- /**
- * Increments the counter for size-based log rotations. This counter tracks
the number of times
- * the log was rotated due to size threshold.
- */
- void incrementSizeBasedRotationCount();
-
- /**
- * Increments the counter for error-based log rotations. This counter tracks
the number of times
- * the log was rotated due to errors.
- */
- void incrementErrorBasedRotationCount();
-
/**
* Increments the counter for total log rotations. This counter tracks the
total number of times
* the log was rotated, regardless of reason.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index 7b3bb5789c..718f6ac63d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -26,9 +26,6 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl
implements MetricsReplicationLogGroupSource {
- private final MutableFastCounter timeBasedRotationCount;
- private final MutableFastCounter sizeBasedRotationCount;
- private final MutableFastCounter errorBasedRotationCount;
private final MutableFastCounter rotationCount;
private final MutableFastCounter rotationFailuresCount;
private final MutableHistogram appendTime;
@@ -44,12 +41,6 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
String metricsContext, String metricsJmxContext, String haGroupName) {
super(metricsName, metricsDescription, metricsContext,
metricsJmxContext + ",haGroup=" + haGroupName);
- timeBasedRotationCount =
getMetricsRegistry().newCounter(TIME_BASED_ROTATION_COUNT,
- TIME_BASED_ROTATION_COUNT_DESC, 0L);
- sizeBasedRotationCount =
getMetricsRegistry().newCounter(SIZE_BASED_ROTATION_COUNT,
- SIZE_BASED_ROTATION_COUNT_DESC, 0L);
- errorBasedRotationCount =
getMetricsRegistry().newCounter(ERROR_BASED_ROTATION_COUNT,
- ERROR_BASED_ROTATION_COUNT_DESC, 0L);
rotationCount = getMetricsRegistry().newCounter(ROTATION_COUNT,
ROTATION_COUNT_DESC, 0L);
rotationFailuresCount =
getMetricsRegistry().newCounter(ROTATION_FAILURES,
ROTATION_FAILURES_DESC, 0L);
@@ -64,21 +55,6 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
DefaultMetricsSystem.instance().unregisterSource(metricsJmxContext);
}
- @Override
- public void incrementTimeBasedRotationCount() {
- timeBasedRotationCount.incr();
- }
-
- @Override
- public void incrementSizeBasedRotationCount() {
- sizeBasedRotationCount.incr();
- }
-
- @Override
- public void incrementErrorBasedRotationCount() {
- errorBasedRotationCount.incr();
- }
-
@Override
public void incrementRotationCount() {
rotationCount.incr();
@@ -111,10 +87,8 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
@Override
public ReplicationLogMetricValues getCurrentMetricValues() {
- return new ReplicationLogMetricValues(timeBasedRotationCount.value(),
- sizeBasedRotationCount.value(), errorBasedRotationCount.value(),
rotationCount.value(),
- rotationFailuresCount.value(), appendTime.getMax(), syncTime.getMax(),
rotationTime.getMax(),
- ringBufferTime.getMax());
+ return new ReplicationLogMetricValues(rotationCount.value(),
rotationFailuresCount.value(),
+ appendTime.getMax(), syncTime.getMax(), rotationTime.getMax(),
ringBufferTime.getMax());
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
index 23ee864747..ef64552ece 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -20,9 +20,6 @@ package org.apache.phoenix.replication.metrics;
/** Class to hold the values of all metrics tracked by the ReplicationLog
metrics source. */
public class ReplicationLogMetricValues {
- private final long timeBasedRotationCount;
- private final long sizeBasedRotationCount;
- private final long errorBasedRotationCount;
private final long rotationCount;
private final long rotationFailuresCount;
private final long appendTime;
@@ -30,12 +27,8 @@ public class ReplicationLogMetricValues {
private final long rotationTime;
private final long ringBufferTime;
- public ReplicationLogMetricValues(long timeBasedRotationCount, long
sizeBasedRotationCount,
- long errorBasedRotationCount, long rotationCount, long
rotationFailuresCount, long appendTime,
+ public ReplicationLogMetricValues(long rotationCount, long
rotationFailuresCount, long appendTime,
long syncTime, long rotationTime, long ringBufferTime) {
- this.timeBasedRotationCount = timeBasedRotationCount;
- this.sizeBasedRotationCount = sizeBasedRotationCount;
- this.errorBasedRotationCount = errorBasedRotationCount;
this.rotationCount = rotationCount;
this.rotationFailuresCount = rotationFailuresCount;
this.appendTime = appendTime;
@@ -44,18 +37,6 @@ public class ReplicationLogMetricValues {
this.ringBufferTime = ringBufferTime;
}
- public long getTimeBasedRotationCount() {
- return timeBasedRotationCount;
- }
-
- public long getSizeBasedRotationCount() {
- return sizeBasedRotationCount;
- }
-
- public long getErrorBasedRotationCount() {
- return errorBasedRotationCount;
- }
-
public long getRotationCount() {
return rotationCount;
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index eca484d5f9..5ff9fc2d41 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -73,9 +73,8 @@ public class ReplicationLogBaseTest {
static final int TEST_RINGBUFFER_SIZE = 32;
static final int TEST_SYNC_TIMEOUT = 1000;
- static final int TEST_ROTATION_TIME = 5000;
static final int TEST_ROTATION_SIZE_BYTES = 10 * 1024;
- static final int TEST_REPLICATION_ROUND_DURATION_SECONDS = 20;
+ static final int TEST_REPLICATION_ROUND_DURATION_SECONDS = 60;
protected ReplicationLogBaseTest() {
this(HAGroupState.ACTIVE_IN_SYNC);
@@ -85,8 +84,11 @@ public class ReplicationLogBaseTest {
this.initialState = initialState;
}
+ protected void overrideConf(Configuration conf) {
+ }
+
@Before
- public void setUpBase() throws IOException {
+ public void setUpBase() throws Exception {
MockitoAnnotations.initMocks(this);
haGroupName = name.getMethodName();
conf = HBaseConfiguration.create();
@@ -98,8 +100,6 @@ public class ReplicationLogBaseTest {
conf.setInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
TEST_RINGBUFFER_SIZE);
// Set a short sync timeout for testing
conf.setLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY,
TEST_SYNC_TIMEOUT);
- // Set rotation time to 10 seconds
- conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
TEST_ROTATION_TIME);
// Small size threshold for testing
conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
TEST_ROTATION_SIZE_BYTES);
@@ -107,6 +107,7 @@ public class ReplicationLogBaseTest {
conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
TEST_REPLICATION_ROUND_DURATION_SECONDS);
conf.setDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, 0.0);
+ overrideConf(conf);
// initialize the group store record
storeRecord = initHAGroupStoreRecord();
@@ -123,6 +124,24 @@ public class ReplicationLogBaseTest {
}
}
+ /**
+ * Closes the current logGroup and creates a fresh one using the current
{@code conf}. Use this
+ * when a test needs to override configuration (e.g. round duration,
retries, size threshold)
+ * after {@link #setUpBase()} has already created the default logGroup.
+ */
+ protected void recreateLogGroup() throws Exception {
+ if (logGroup != null) {
+ logGroup.close();
+ }
+ logGroup = new TestableLogGroup(conf, serverName, haGroupName,
haGroupStoreManager);
+ logGroup.init();
+ }
+
+ protected static void waitForRotationTick(int roundDurationSeconds) throws
InterruptedException {
+ Thread.sleep((long) (roundDurationSeconds * 1000 * 1.25));
+ LOG.info("Waking up after waiting for rotation tick");
+ }
+
private HAGroupStoreRecord initHAGroupStoreRecord() {
return new HAGroupStoreRecord(null, haGroupName, initialState, 0,
HighAvailabilityPolicy.FAILOVER.toString(), "peerZKUrl", "clusterUrl",
"peerClusterUrl",
@@ -149,7 +168,9 @@ public class ReplicationLogBaseTest {
}
/**
- * Testable version of ReplicationLog that allows spying on the log
+ * Testable version of ReplicationLog that allows spying on the log.
Overrides
+ * startRotationExecutor to always use a full round as initial delay so that
the rotation task
+ * never fires unexpectedly when a test happens to start near a round
boundary.
*/
static class TestableLog extends ReplicationLog {
@@ -158,9 +179,17 @@ public class ReplicationLogBaseTest {
super(logGroup, shardManager);
}
+ @Override
+ protected void startRotationExecutor() {
+ // Use a full round as the initial delay so the rotation task never
fires early when the
+ // test happens to start close to a round boundary (e.g. initialDelay of
852ms on a 60s round)
+ super.startRotationExecutor(rotationTimeMs);
+ }
+
@Override
protected LogFileWriter createNewWriter() throws IOException {
LogFileWriter writer = super.createNewWriter();
+ LOG.info("createNewWriter called, generation={}",
writer.getGeneration());
return spy(writer);
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
index 765594beb2..770956b70d 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication;
import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC;
+import static
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -33,11 +34,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
import org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode;
import org.apache.phoenix.replication.log.LogFileTestUtil;
-import org.junit.After;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -55,16 +57,17 @@ public class ReplicationLogDiscoveryForwarderTest extends
ReplicationLogBaseTest
super(HAGroupState.ACTIVE_NOT_IN_SYNC);
}
+ @Override
+ protected void overrideConf(Configuration conf) {
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
+ }
+
@Before
public void setUp() throws IOException {
ReplicationMode mode = logGroup.getMode();
Assert.assertTrue(mode.equals(STORE_AND_FORWARD));
}
- @After
- public void tearDown() throws IOException {
- }
-
@Test
public void testLogForwardingAndTransitionBackToSyncMode() throws Exception {
final String tableName = "TESTTBL";
@@ -143,7 +146,6 @@ public class ReplicationLogDiscoveryForwarderTest extends
ReplicationLogBaseTest
@Test
public void testSyncModeUpdateWaitTime() throws Exception {
final long[] waitTime = { 8L };
- int roundDurationSeconds =
logGroup.getLocalShardManager().getReplicationRoundDurationSeconds();
doAnswer(new Answer<Object>() {
@Override
@@ -165,9 +167,11 @@ public class ReplicationLogDiscoveryForwarderTest extends
ReplicationLogBaseTest
return ret;
}
}).when(haGroupStoreManager).setHAGroupStatusToSync(haGroupName);
- Thread.sleep(roundDurationSeconds * 3 * 1000);
- LOG.info("Coming out of sleep");
- // we should have switched back to the SYNC mode
+
+ long deadline = EnvironmentEdgeManager.currentTimeMillis() + 120_000;
+ while (logGroup.getMode() != SYNC &&
EnvironmentEdgeManager.currentTimeMillis() < deadline) {
+ Thread.sleep(500);
+ }
assertEquals(SYNC, logGroup.getMode());
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 39109364c0..3bd389a0d5 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication;
import static java.lang.Thread.sleep;
import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
+import static
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -40,16 +41,13 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.phoenix.replication.ReplicationLog.RotationReason;
import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -109,8 +107,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
}
/**
- * Tests the behavior when a sync operation fails. Verifies that the system
properly handles sync
- * failures by rolling to a new writer and retrying the operation.
+ * Tests the behavior when a sync operation fails transiently. Verifies that
the system retries
+ * with the same writer and succeeds on the next attempt.
*/
@Test
public void testSyncFailureAndRetry() throws Exception {
@@ -118,27 +116,25 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
final long commitId = 1L;
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ ReplicationLog activeLog = logGroup.getActiveLog();
// Get the inner writer
- LogFileWriter writerBeforeRoll = logGroup.getActiveLog().getWriter();
- assertNotNull("Initial writer should not be null", writerBeforeRoll);
+ LogFileWriter writer = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", writer);
- // Configure writerBeforeRoll to fail on the first sync call
- doThrow(new IOException("Simulated sync
failure")).when(writerBeforeRoll).sync();
+ // Keep returning the same writer so a rotation tick can't swap in a
different one
+ doAnswer(invocation -> writer).when(activeLog).createNewWriter();
+
+ // Configure writer to fail on the first sync call, then succeed
+ doThrow(new IOException("Simulated sync
failure")).doCallRealMethod().when(writer).sync();
// Append data
logGroup.append(tableName, commitId, put);
logGroup.sync();
- // Get the inner writer we rolled to.
- LogFileWriter writerAfterRoll = logGroup.getActiveLog().getWriter();
- assertNotNull("Initial writer should not be null", writerBeforeRoll);
-
- // Verify the sequence: append, sync (fail), rotate, append (retry), sync
(succeed)
- InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
- inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put));
- inOrder.verify(writerBeforeRoll, times(1)).sync(); // Failed
- inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // Replay
- inOrder.verify(writerAfterRoll, times(1)).sync(); // Succeeded
+ // Verify the sequence: append, sync (fail), sync (succeed on retry with
same writer)
+ InOrder inOrder = Mockito.inOrder(writer);
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ inOrder.verify(writer, times(2)).sync(); // First fails, second succeeds
}
/**
@@ -346,13 +342,18 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
/**
* Tests time-based log rotation. Verifies that the log file is rotated
after the configured
- * rotation time period and that operations continue correctly with the new
log file.
+ * rotation time period and that operations continue correctly with the new
log file. The writer
+ * swap happens pre-action in apply(), so the second append+sync go directly
to the new writer.
*/
@Test
public void testTimeBasedRotation() throws Exception {
final String tableName = "TBLTBR";
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
final long commitId = 1L;
+ final int roundDurationSeconds = 5;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
// Get the initial writer
LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
@@ -362,42 +363,31 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId, put);
logGroup.sync();
- // Wait for rotation time to elapse
- sleep((long) (TEST_ROTATION_TIME * 1.25));
+ // Wait for rotation time to elapse so LogRotationTask stages a
pendingWriter
+ waitForRotationTick(roundDurationSeconds);
- // Append more data to trigger rotation check
+ // This append's apply() drains pendingWriter before the append, so it
goes to new writer
logGroup.append(tableName, commitId + 1, put);
logGroup.sync();
- // Get the new writer after rotation
+ // Verify we have a new writer
LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
assertNotNull("New writer should not be null", writerAfterRotation);
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
// Verify the sequence of operations
InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
- inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // First
-
// append
-
// to
-
// initial
-
// writer
+ inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId), eq(put));
inOrder.verify(writerBeforeRotation, times(1)).sync();
- inOrder.verify(writerAfterRotation, times(0)).append(eq(tableName),
eq(commitId), eq(put)); // First
-
// append
-
// is
-
// not
-
// replayed
- inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 1), eq(put)); // Second
-
// append
-
// to
-
// new
-
// writer
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 1), eq(put));
inOrder.verify(writerAfterRotation, times(1)).sync();
}
/**
* Tests size-based log rotation. Verifies that the log file is rotated when
it exceeds the
- * configured size threshold and that operations continue correctly with the
new log file.
+ * configured size threshold and that operations continue correctly with the
new log file. After
+ * the first sync, requestRotationIfNeeded submits an on-demand
LogRotationTask which creates the
+ * new writer immediately on the executor thread.
*/
@Test
public void testSizeBasedRotation() throws Exception {
@@ -405,34 +395,31 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
long commitId = 1L;
- LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter writerBeforeRotation = activeLog.getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRotation);
// Append enough data so that we exceed the size threshold.
for (int i = 0; i < 100; i++) {
logGroup.append(tableName, commitId++, put);
}
- logGroup.sync(); // Should trigger a sized based rotation
+ // Sync: data goes to old writer. requestRotationIfNeeded submits
on-demand rotation task.
+ logGroup.sync();
- // Get the new writer after the expected rotation.
- LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
- assertNotNull("New writer should not be null", writerAfterRotation);
- assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
+ // Wait for the on-demand rotation task to create a new writer on the
background thread
+ Thread.sleep(100);
- // Append one more mutation to verify we're using the new writer.
+ // Next append's apply() drains pending writer → goes to new writer
logGroup.append(tableName, commitId, put);
logGroup.sync();
- // Verify the sequence of operations
- InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
- // Verify all appends before rotation went to the first writer.
- for (int i = 1; i < commitId; i++) {
- inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq((long) i), eq(put));
- }
- inOrder.verify(writerBeforeRotation, times(1)).sync();
- // Verify the final append went to the new writer.
- inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId), eq(put));
- inOrder.verify(writerAfterRotation, times(1)).sync();
+ LogFileWriter writerAfterRotation = activeLog.getWriter();
+ assertNotNull("New writer should not be null", writerAfterRotation);
+ assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
+
+ // Verify the final append went to the new writer
+ verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ verify(writerAfterRotation, times(1)).sync();
}
/**
@@ -480,13 +467,18 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
/**
* Tests the automatic rotation task. Verifies that the background rotation
task correctly rotates
- * log files based on the configured rotation time.
+ * log files based on the configured rotation time. The writer swap happens
pre-action in apply(),
+ * and the old writer is closed asynchronously via closeExecutor.
*/
@Test
public void testRotationTask() throws Exception {
final String tableName = "TBLRT";
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
long commitId = 1L;
+ final int roundDurationSeconds = 5;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRotation);
@@ -494,34 +486,31 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Append some data and wait for the rotation time to elapse plus a small
buffer.
logGroup.append(tableName, commitId, put);
logGroup.sync();
- sleep((long) (TEST_ROTATION_TIME * 1.25));
+ waitForRotationTick(roundDurationSeconds);
+
+ // The LogRotationTask has staged a pendingWriter. Next append's apply()
drains it.
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
// Get the new writer after the rotation.
LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
assertNotNull("New writer should not be null", writerAfterRotation);
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
- // Verify first append and sync went to initial writer
+ // Verify first batch went to initial writer
verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L),
eq(put));
verify(writerBeforeRotation, times(1)).sync();
- // Verify the initial writer was closed
- verify(writerBeforeRotation, times(1)).close();
+ // Verify second batch went to new writer (swap happened before append)
+ verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId +
1), eq(put));
+ verify(writerAfterRotation, times(1)).sync();
+ // Verify the initial writer was closed asynchronously
+ verify(writerBeforeRotation, timeout(5000).times(1)).close();
}
/**
- * Tests behavior when log rotation fails temporarily but eventually
succeeds. Verifies that:
- * <ul>
- * <li>The system can handle temporary rotation failures</li>
- * <li>After failing twice, the third rotation attempt succeeds</li>
- * <li>Operations continue correctly with the new writer after successful
rotation</li>
- * <li>The metrics for rotation failures are properly tracked</li>
- * <li>Operations can continue with the current writer while rotation
attempts are failing</li>
- * </ul>
- * <p>
- * This test simulates a scenario where the first two rotation attempts fail
(e.g., due to
- * temporary HDFS issues) but the third attempt succeeds. This is a common
real-world scenario
- * where transient failures occur but the system eventually recovers. During
the failed rotation
- * attempts, the system should continue to operate normally with the current
writer.
+ * Tests behavior when log rotation fails temporarily but eventually
succeeds. The rotation task
+ * fails to create a new writer on the first attempt, but succeeds on the
second. During the
+ * failure, operations continue normally with the current writer.
*/
@Test
public void testFailedRotation() throws Exception {
@@ -531,7 +520,6 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
ReplicationLog activeLog = logGroup.getActiveLog();
- // Get the initial writer
LogFileWriter initialWriter = activeLog.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
@@ -548,38 +536,37 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId, put);
logGroup.sync();
- // Rotate the log.
- LogFileWriter writerAfterFailedRotate =
activeLog.rotateLog(RotationReason.TIME);
- assertEquals("Should still be using the initial writer", initialWriter,
- writerAfterFailedRotate);
+ // Force rotation — first attempt will fail
+ activeLog.forceRotation();
- // While rotation is failing, verify we can continue to use the current
writer.
+ // Verify we can still use the current writer after the failed rotation
logGroup.append(tableName, commitId + 1, put);
logGroup.sync();
+ assertEquals("Should still be using the initial writer", initialWriter,
activeLog.getWriter());
- LogFileWriter writerAfterRotate = activeLog.rotateLog(RotationReason.TIME);
- assertNotEquals("Should be using a new writer", initialWriter,
writerAfterRotate);
+ // Force rotation again — this one should succeed
+ activeLog.forceRotation();
- // Try to append more data. This should work with the new writer after
successful rotation.
+ // Trigger swap by appending and syncing
logGroup.append(tableName, commitId + 2, put);
logGroup.sync();
+ LogFileWriter writerAfterRotate = activeLog.getWriter();
+ assertNotEquals("Should be using a new writer", initialWriter,
writerAfterRotate);
+
// Verify operations went to the writers in the correct order
InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
- // First append and sync on initial writer.
inOrder.verify(initialWriter).append(eq(tableName), eq(commitId), eq(put));
inOrder.verify(initialWriter).sync();
- // Second append and sync on initial writer after failed rotation.
inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1),
eq(put));
inOrder.verify(initialWriter).sync();
- // Final append and sync on new writer after successful rotation.
inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2),
eq(put));
inOrder.verify(writerAfterRotate).sync();
}
/**
- * This test simulates a scenario where rotation consistently fails and
verifies that the system
- * properly propagates an exception after exhausting all retry attempts.
+ * Tests that too many consecutive rotation failures cause the log to close
via closeOnError().
+ * The LogRotationTask tracks failures across attempts and fail-stops after
maxRotationRetries.
*/
@Test
public void testTooManyRotationFailures() throws Exception {
@@ -589,7 +576,6 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
ReplicationLog activeLog = logGroup.getActiveLog();
- // Get the initial writer
LogFileWriter initialWriter = activeLog.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
@@ -601,25 +587,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId, put);
logGroup.sync();
- // Try to rotate the log multiple times until we exceed the retry limit
+ // Force rotation repeatedly until it exceeds maxRotationRetries and calls
closeOnError
for (int i = 0; i <=
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
- try {
- activeLog.rotateLog(RotationReason.TIME);
- } catch (IOException e) {
- if (i < ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
- // Not the last attempt yet, continue
- continue;
- }
- // This was the last attempt, verify the exception
- assertTrue("Expected IOException", e instanceof IOException);
- assertTrue("Expected our mocked failure cause",
- e.getMessage().contains("Simulated failure"));
-
- }
+ activeLog.forceRotation();
}
- // Verify subsequent operations will fail because the log is closed and
then trigger
- // a mode switch to STORE_AND_FORWARD
+ assertTrue("Log should be closed after too many rotation failures",
activeLog.isClosed());
+
+ // Verify subsequent operations trigger a mode switch to STORE_AND_FORWARD
logGroup.append(tableName, commitId + 1, put);
logGroup.sync();
assertEquals(STORE_AND_FORWARD, logGroup.getMode());
@@ -682,19 +657,23 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
LogFileWriter initialWriter = activeLog.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
- // Configure initial writer to fail on sync
+ // Configure initial writer to always fail on sync
doThrow(new IOException("Simulated sync
failure")).when(initialWriter).sync();
- // createNewWriter should keep returning the bad writer
- doAnswer(invocation -> initialWriter).when(activeLog).createNewWriter();
+ // Make any new writers also fail on sync so a rotation can't rescue the
retry loop
+ doAnswer(invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ doThrow(new IOException("Simulated sync failure")).when(w).sync();
+ return w;
+ }).when(activeLog).createNewWriter();
// Append data
logGroup.append(tableName, commitId, put);
// Try to sync. Should fail after exhausting retries and then switch to
STORE_AND_FORWARD
logGroup.sync();
- // Each retry creates a new writer, so that is at least 1 create + 4
retries.
- verify(activeLog, atLeast(5)).createNewWriter();
+ // All retries use the same writer — verify sync was attempted maxAttempts
times
+ verify(initialWriter, atLeast(2)).sync();
assertEquals(STORE_AND_FORWARD, logGroup.getMode());
}
@@ -714,11 +693,15 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
LogFileWriter initialWriter = activeLog.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
- // Configure initial writer to fail on sync
+ // Configure initial writer to always fail on sync
doThrow(new IOException("Simulated sync
failure")).when(initialWriter).sync();
- // createNewWriter should keep returning the bad writer
- doAnswer(invocation -> initialWriter).when(activeLog).createNewWriter();
+ // Make any new writers also fail on sync so a rotation can't rescue the
retry loop
+ doAnswer(invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ doThrow(new IOException("Simulated sync failure")).when(w).sync();
+ return w;
+ }).when(activeLog).createNewWriter();
doThrow(new IOException("Simulated failure to update HAGroupStore state"))
.when(haGroupStoreManager).setHAGroupStatusToStoreAndForward(haGroupName);
@@ -732,13 +715,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
} catch (RuntimeException ex) {
assertTrue(ex.getMessage().contains("Simulated sync failure"));
}
- // wait for the even processor thread to clean up
+ // wait for the event processor thread to clean up
Thread.sleep(3);
}
/**
- * Tests log rotation behavior during batch operations. Verifies that the
system correctly handles
- * rotation when there are pending batch operations, ensuring no data loss.
+ * Tests rotation during an in-flight batch. When a pending writer is staged
by the rotation task
+ * while appends are in flight, the sync's apply() drains the pending
writer, replays the 5
+ * unsynced records into the new writer, then syncs the new writer.
*/
@Test
public void testRotationDuringBatch() throws Exception {
@@ -750,42 +734,38 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRotation);
- // Append several items to fill currentBatch but don't sync yet
+ // Append several items but don't sync yet
for (int i = 0; i < 5; i++) {
logGroup.append(tableName, commitId + i, put);
}
- // Force a rotation by waiting for rotation time to elapse
- sleep((long) (TEST_ROTATION_TIME * 1.25));
+ // Stage a pending writer via forced rotation
+ logGroup.getActiveLog().forceRotation();
- // Get the new writer after rotation
+ // Sync — apply() drains pendingWriter, replays 5 records into new writer,
then syncs
+ logGroup.sync();
+
+ // The swap happened before sync action
LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
assertNotNull("New writer should not be null", writerAfterRotation);
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
- // Now trigger a sync which should replay the currentBatch to the new
writer
- logGroup.sync();
-
- // Verify the sequence of operations
InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
-
- // Verify all appends before rotation went to the first writer
+ // 5 appends went to old writer (processed before rotation task fired)
for (int i = 0; i < 5; i++) {
inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId + i),
eq(put));
}
-
- // Verify the currentBatch was replayed to the new writer
+ // Swap happens before sync action: 5 records replayed into new writer
for (int i = 0; i < 5; i++) {
inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + i),
eq(put));
}
-
- // Verify sync happened on the new writer
+ // Sync goes to new writer
inOrder.verify(writerAfterRotation, times(1)).sync();
- // Verify the initial writer was closed
- verify(writerBeforeRotation, times(1)).close();
+ // Verify the initial writer was closed asynchronously
+ verify(writerBeforeRotation, timeout(5000).times(1)).close();
}
/**
@@ -811,7 +791,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.sync(); // Sync to commit the appends to the current writer.
// Force a rotation to close the current writer.
- activeLog.rotateLog(RotationReason.SIZE);
+ activeLog.forceRotation();
assertTrue("Log file should exist", localFs.exists(logPath));
@@ -869,7 +849,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
}
logGroup.sync(); // Sync to commit the appends to the current writer.
// Force a rotation to close the current writer.
- activeLog.rotateLog(RotationReason.SIZE);
+ activeLog.forceRotation();
}
// Verify all log files exist
@@ -905,14 +885,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
}
/**
- * Tests reading records after multiple rotations with intermittent syncs.
If we do not sync when
- * we roll a file, the in-flight batch is replayed into the new writer when
we do finally sync
- * (with the new writer). Verifies that records can be correctly read even
when syncs are not
- * performed before each rotation, ensuring data consistency.
+ * Tests reading records after multiple rotations with varying batch sizes.
The writer swap
+ * happens inside {@code apply} before each attempt, so unsynced appends are
replayed into the new
+ * writer. This test verifies data consistency across many rotations with
different record counts
+ * per file.
*/
@Test
- public void testReadAfterMultipleRotationsWithReplay() throws Exception {
- final String tableName = "TBLRAMRIS";
+ public void testReadAfterMultipleRotationsWithVaryingBatchSizes() throws
Exception {
+ final String tableName = "TBLRAMRVBS";
final int NUM_RECORDS_PER_ROTATION = 100;
final int NUM_ROTATIONS = 10;
final int TOTAL_RECORDS = NUM_RECORDS_PER_ROTATION * NUM_ROTATIONS;
@@ -921,9 +901,7 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
ReplicationLog activeLog = logGroup.getActiveLog();
- // Write records across multiple rotations, only syncing 50% of the time.
for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
- // Get the path of the current log file.
Path logPath = activeLog.getWriter().getContext().getFilePath();
logPaths.add(logPath);
@@ -935,13 +913,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
}
- // Only sync 50% of the time before rotation. To ensure we sync on the
last file
- // we are going to write, use 'rotation % 2 == 1' instead of 'rotation %
2 == 0'.
- if (rotation % 2 == 1) {
- logGroup.sync(); // Sync to commit the appends to the current writer.
- }
- // Force a rotation to close the current writer.
- activeLog.rotateLog(RotationReason.SIZE);
+ logGroup.sync();
+ activeLog.forceRotation();
}
// Verify all log files exist
@@ -949,10 +922,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
assertTrue("Log file should exist: " + logPath, localFs.exists(logPath));
}
- // Read and verify all records from each log file, tracking unique records
and duplicates.
- Set<LogFile.Record> uniqueRecords = new HashSet<>();
+ // Read and verify all records from each log file
List<LogFile.Record> allReadRecords = new ArrayList<>();
-
for (Path logPath : logPaths) {
LogFileReader reader = new LogFileReader();
LogFileReaderContext readerContext =
@@ -961,24 +932,22 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
LogFile.Record record;
while ((record = reader.next()) != null) {
allReadRecords.add(record);
- uniqueRecords.add(record);
}
reader.close();
}
- // Print statistics about duplicates for informational purposes.
- LOG.info("{} total records across all files", allReadRecords.size());
- LOG.info("{} unique records", uniqueRecords.size());
- LOG.info("{} duplicate records", allReadRecords.size() -
uniqueRecords.size());
+ assertEquals("Total number of records mismatch", TOTAL_RECORDS,
allReadRecords.size());
- // Verify we have all the expected unique records
- assertEquals("Number of unique records mismatch", TOTAL_RECORDS,
uniqueRecords.size());
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ LogFileTestUtil.assertRecordEquals("Record mismatch at index " + i,
originalRecords.get(i),
+ allReadRecords.get(i));
+ }
}
/**
- * Tests behavior when a RuntimeException occurs during writer.getLength()
in shouldRotate().
- * Verifies that the system properly handles critical errors by closing the
log and preventing
- * further operations.
+ * Tests behavior when a RuntimeException occurs during writer.getLength() in
+ * shouldRotateForSize(). Verifies that the system properly handles critical
errors by closing the
+ * log and preventing further operations.
*/
@Test
public void testRuntimeExceptionDuringLengthCheck() throws Exception {
@@ -1095,6 +1064,64 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
verify(innerWriter, times(1)).close();
}
+ /**
+ * Tests that an undrained pendingWriter is closed and replaced by the
rotation task. This
+ * simulates an idle system where no events flow to drain the staged writer
before the next
+ * rotation tick fires. Verifies the undrained writer is actually closed by
the second tick.
+ */
+ @Test
+ public void testUndrainedPendingWriterReplaced() throws Exception {
+ final String tableName = "TBLUPWR";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+ final int roundDurationSeconds = 5;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Capture writers created by rotation ticks
+ List<LogFileWriter> rotationWriters = new ArrayList<>();
+ doAnswer(invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ rotationWriters.add(w);
+ return w;
+ }).when(activeLog).createNewWriter();
+
+ // Append and sync to establish baseline
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
+
+ // Wait for rotation tick — this stages a pending writer (W2)
+ waitForRotationTick(roundDurationSeconds);
+
+ // The pending writer is staged but not drained (no events to trigger
checkAndReplaceWriter).
+ // Wait for the next rotation tick — it should close the undrained writer
and stage a new one.
+ waitForRotationTick(roundDurationSeconds);
+
+ // Now drain by appending (apply() calls checkAndReplaceWriter)
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
+
+ LogFileWriter writerAfterRotation = activeLog.getWriter();
+ assertTrue("Writer should have been rotated", writerAfterRotation !=
initialWriter);
+
+ // W2 was created by first tick, W3 by second tick
+ assertTrue("Expected at least 2 rotation writers", rotationWriters.size()
>= 2);
+ LogFileWriter undrainedWriter = rotationWriters.get(0);
+ LogFileWriter finalWriter = rotationWriters.get(rotationWriters.size() -
1);
+
+ // Undrained W2 was closed synchronously by the second rotation tick
+ verify(undrainedWriter, times(1)).close();
+
+ // Final writer is the one we're using
+ assertEquals("Active writer should be the last rotation writer",
finalWriter,
+ writerAfterRotation);
+ }
+
/**
* Tests that multiple sync requests are consolidated into a single sync
operation on the inner
* writer when they occur in quick succession. Verifies that the Disruptor
batching and
@@ -1293,6 +1320,308 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
group2.close();
}
+ /**
+ * Tests that unsynced appends are replayed into the new writer when a swap
happens mid-batch.
+ * Append 3 records (no sync), stage a pending writer, append a 4th record
which triggers the swap
+ * + replay, then sync. Old writer gets 3 appends, new writer gets 3
replayed + 4th append + sync.
+ */
+ @Test
+ public void testReplayOnMidBatchSwap() throws Exception {
+ final String tableName = "TBLRMBS";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ long commitId = 1L;
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter writerBeforeRotation = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+ // Append 3 records without syncing — they accumulate in currentBatch
+ for (int i = 0; i < 3; i++) {
+ logGroup.append(tableName, commitId + i, put);
+ }
+
+ // Stage a pending writer via forced rotation
+ activeLog.forceRotation();
+
+ // 4th append triggers checkAndReplaceWriter + generation mismatch →
replays 3 records
+ logGroup.append(tableName, commitId + 3, put);
+ logGroup.sync();
+
+ LogFileWriter writerAfterRotation = activeLog.getWriter();
+ assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
+
+ InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
+ // 3 appends went to old writer
+ for (int i = 0; i < 3; i++) {
+ inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId + i),
+ eq(put));
+ }
+ // 3 records replayed into new writer
+ for (int i = 0; i < 3; i++) {
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + i),
+ eq(put));
+ }
+ // 4th append goes to new writer
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 3), eq(put));
+ inOrder.verify(writerAfterRotation, times(1)).sync();
+
+ // Old writer closed async
+ verify(writerBeforeRotation, timeout(5000).times(1)).close();
+ }
+
+ /**
+ * Tests the lease-recovery scenario: when the current writer's stream is
broken (simulated by
+ * failing sync), a staged pending writer is picked up before the action,
replays unsynced
+ * appends, and succeeds without ever calling sync on the broken writer.
+ */
+ @Test
+ public void testRetryPicksUpStagedWriter() throws Exception {
+ final String tableName = "TBLRPSW";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Append a record — goes to initialWriter
+ logGroup.append(tableName, commitId, put);
+
+ // Configure initial writer's sync to fail (simulating broken stream after
lease recovery)
+ doThrow(new IOException("Simulated broken
stream")).when(initialWriter).sync();
+
+ // Stage a pending writer via forced rotation
+ activeLog.forceRotation();
+
+ // Sync: checkAndReplaceWriter drains pending writer before the sync
action,
+ // replays 1 record into new writer, then syncs new writer. Old writer's
sync is
+ // never called because the swap happens before the action.
+ logGroup.sync();
+
+ LogFileWriter newWriter = activeLog.getWriter();
+ assertTrue("Should be using new writer", newWriter != initialWriter);
+
+ // Old writer: received the append only
+ verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+
+ // New writer: received replayed append + successful sync
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+ verify(newWriter, times(1)).sync();
+ }
+
+ /**
+ * Tests the idle-then-lease-recovery scenario: after a sync clears
currentBatch, the system goes
+ * idle. A rotation tick stages a pending writer. The reader performs HDFS
lease recovery,
+ * breaking the old writer's stream. When events resume, apply() drains the
healthy staged writer
+ * before the action — the broken writer is never touched. No replay needed
(empty batch).
+ */
+ @Test
+ public void testIdleLeaseRecoveryDrainsStagedWriter() throws Exception {
+ final String tableName = "TBLILRDSW";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Append + sync to establish baseline and clear currentBatch
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
+
+ // Stage W2 in pendingWriter via forced rotation
+ activeLog.forceRotation();
+
+ // Simulate HDFS lease recovery breaking the old writer's stream
+ doThrow(new IOException("Simulated broken stream after lease
recovery")).when(initialWriter)
+ .append(anyString(), anyLong(), any(Mutation.class));
+ doThrow(new IOException("Simulated broken stream after lease
recovery")).when(initialWriter)
+ .sync();
+
+ // Events resume — apply() drains W2 before the action, so broken writer
is never touched
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
+
+ LogFileWriter newWriter = activeLog.getWriter();
+ assertTrue("Should be using new writer after idle + lease recovery",
+ newWriter != initialWriter);
+
+ // New writer received the new append + sync (no replay — currentBatch was
empty)
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1),
eq(put));
+ verify(newWriter, times(1)).sync();
+
+ // Old writer: only the pre-idle append + sync, nothing after the break
+ verify(initialWriter, times(1)).append(eq(tableName), eq(commitId),
eq(put));
+ verify(initialWriter, times(1)).sync();
+ }
+
+ /**
+ * Tests that a failed replay is retried on the next attempt. The new
writer's first append fails
+ * during replay, so the generation stays stale. On retry, the generation
mismatch is detected
+ * again and replay succeeds.
+ */
+ @Test
+ public void testReplayFailureRetries() throws Exception {
+ final String tableName = "TBLRFR";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ long commitId = 1L;
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Intercept createNewWriter to capture the new writer and make its first
append fail
+ final AtomicBoolean failFirstAppend = new AtomicBoolean(true);
+ doAnswer(invocation -> {
+ LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+ doAnswer(appendInvocation -> {
+ if (failFirstAppend.getAndSet(false)) {
+ throw new IOException("Simulated transient HDFS error during
replay");
+ }
+ return appendInvocation.callRealMethod();
+ }).when(w).append(anyString(), anyLong(), any(Mutation.class));
+ return w;
+ }).when(activeLog).createNewWriter();
+
+ // Append 2 records without syncing — they accumulate in currentBatch
+ logGroup.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId + 1, put);
+
+ // Stage a pending writer via forced rotation (whose first append will
fail)
+ activeLog.forceRotation();
+
+ // 3rd append triggers swap + replay of [r1, r2].
+ // Attempt 1: replay fails on first record → IOException → generation
stays stale
+ // Attempt 2: generation mismatch still → replay retries → succeeds → r3
appended
+ logGroup.append(tableName, commitId + 2, put);
+ logGroup.sync();
+
+ LogFileWriter newWriter = activeLog.getWriter();
+ assertTrue("Should be using new writer", newWriter != initialWriter);
+
+ // New writer: attempt 1 replay failed on r1 (1 call). Attempt 2 replayed
r1+r2 (2 calls)
+ // then appended r3. Total: r1 called 2x, r2 called 1x, r3 called 1x.
+ verify(newWriter, times(2)).append(eq(tableName), eq(commitId), eq(put));
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1),
eq(put));
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 2),
eq(put));
+ verify(newWriter, times(1)).sync();
+ }
+
+ /**
+ * Tests error-recovery rotation: when the current writer's stream is
broken, the second failure
+ * in apply() triggers requestRotation() which submits an on-demand
LogRotationTask. During the
+ * retry sleep, the background thread creates a new writer. The next attempt
drains it and
+ * succeeds.
+ */
+ @Test
+ public void testErrorRecoveryRequestsNewWriter() throws Exception {
+ final String tableName = "TBLERNW";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Configure initial writer's append to always fail (simulating broken
HDFS stream)
+ doThrow(new IOException("Simulated broken
stream")).when(initialWriter).append(anyString(),
+ anyLong(), any(Mutation.class));
+
+ // Append — attempt 1 fails, attempt 2 fails + requestRotation(), during
sleep the
+ // background thread creates a new writer, attempt 3 drains it and succeeds
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
+
+ LogFileWriter newWriter = activeLog.getWriter();
+ assertTrue("Should be using a new writer after error recovery", newWriter
!= initialWriter);
+
+ // Old writer received failed attempts
+ verify(initialWriter, atLeast(2)).append(eq(tableName), eq(commitId),
eq(put));
+ // New writer received the successful append
+ verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+ verify(newWriter, times(1)).sync();
+ }
+
+ /**
+ * Tests that an on-demand size rotation mid-interval does not suppress the
next scheduled tick.
+ * After size rotation creates a writer early, the scheduled tick still
fires and creates another.
+ */
+ @Test
+ public void testOnDemandRotationDoesNotSuppressScheduledTick() throws
Exception {
+ final String tableName = "TBLODRNST";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
+ long commitId = 1L;
+ final int roundDurationSeconds = 5;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
+
+ ReplicationLog activeLog = logGroup.getActiveLog();
+ LogFileWriter initialWriter = activeLog.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Append enough data to trigger size rotation → requestRotation() fires
+ for (int i = 0; i < 100; i++) {
+ logGroup.append(tableName, commitId++, put);
+ }
+ logGroup.sync();
+
+ // Wait for the on-demand rotation task to create a new writer on the
background thread
+ Thread.sleep(100);
+
+ // Drain W2 via next append
+ logGroup.append(tableName, commitId++, put);
+ logGroup.sync();
+
+ LogFileWriter writerAfterSizeRotation = activeLog.getWriter();
+ assertTrue("Writer should have been rotated for size",
+ writerAfterSizeRotation != initialWriter);
+
+ // Wait for the scheduled rotation tick
+ waitForRotationTick(roundDurationSeconds);
+
+ // Drain W3
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
+
+ LogFileWriter writerAfterScheduledTick = activeLog.getWriter();
+ assertTrue("Scheduled tick should have created a new writer (not
suppressed)",
+ writerAfterScheduledTick != writerAfterSizeRotation);
+ }
+
+ /**
+ * Tests that the rotation executor fires at the round boundary by verifying
that after waiting
+ * for slightly more than a round, a rotation has occurred.
+ */
+ @Test
+ public void testRotationScheduleAlignsWithRoundBoundary() throws Exception {
+ final String tableName = "TBLRSARB";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+ final int roundDurationSeconds = 5;
+
+ conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
roundDurationSeconds);
+ recreateLogGroup();
+
+ LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+ // Append and sync so we have data
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
+
+ // Wait for slightly more than one round duration — the rotation task
should have fired
+ waitForRotationTick(roundDurationSeconds);
+
+ // Trigger drain
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
+
+ LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
+ assertTrue("Rotation should have happened at round boundary",
+ writerAfterRotation != writerBeforeRotation);
+ }
+
// @Test
public void testAppendTimeoutWhileSyncPending() throws Exception {
final String tableName = "TESTTBL";
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
new file mode 100644
index 0000000000..89bf708f05
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class ReplicationLogTest {
+
+ @Test
+ public void testComputeInitialDelay() {
+ long rotationTimeMs = 60_000L;
+
+ // Exactly on a round boundary: full round until next tick
+ long now = 120_000L;
+ long roundStart = 120_000L;
+ assertEquals(rotationTimeMs,
+ ReplicationLog.computeInitialDelay(now, roundStart, rotationTimeMs));
+
+ // Middle of the round: half a round remaining
+ now = 150_000L;
+ roundStart = 120_000L;
+ assertEquals(30_000L, ReplicationLog.computeInitialDelay(now, roundStart,
rotationTimeMs));
+
+ // Near the end of a round: small but positive delay
+ now = 179_999L;
+ roundStart = 120_000L;
+ assertEquals(1L, ReplicationLog.computeInitialDelay(now, roundStart,
rotationTimeMs));
+
+ // Just after a round boundary
+ now = 120_001L;
+ roundStart = 120_000L;
+ assertEquals(59_999L, ReplicationLog.computeInitialDelay(now, roundStart,
rotationTimeMs));
+ }
+
+ @Test
+ public void testComputeInitialDelayIsAlwaysPositive() {
+ long rotationTimeMs = 60_000L;
+ long dayStart = 1704067200000L;
+
+ for (int offsetMs = 0; offsetMs < 60_000; offsetMs += 1000) {
+ long now = dayStart + offsetMs;
+ long roundStart = (now / rotationTimeMs) * rotationTimeMs;
+ long delay = ReplicationLog.computeInitialDelay(now, roundStart,
rotationTimeMs);
+ assertTrue("Delay must be > 0 for offset " + offsetMs, delay > 0);
+ assertTrue("Delay must be <= rotationTimeMs for offset " + offsetMs,
delay <= rotationTimeMs);
+ }
+ }
+}