This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 2fcf0cc985 PHOENIX-7793 Replication Log writer improvements (#2433)
2fcf0cc985 is described below

commit 2fcf0cc9857845306e300c1a73c5330c106df9e5
Author: tkhurana <[email protected]>
AuthorDate: Thu Apr 30 10:56:14 2026 -0700

    PHOENIX-7793 Replication Log writer improvements (#2433)
---
 .../apache/phoenix/replication/ReplicationLog.java | 360 ++++++-----
 .../phoenix/replication/ReplicationLogGroup.java   |   1 +
 .../replication/log/LogFileFormatWriter.java       |   5 +-
 .../phoenix/replication/log/LogFileWriter.java     |   6 +-
 .../metrics/MetricsReplicationLogGroupSource.java  |  27 -
 .../MetricsReplicationLogGroupSourceImpl.java      |  30 +-
 .../metrics/ReplicationLogMetricValues.java        |  21 +-
 .../replication/ReplicationLogBaseTest.java        |  41 +-
 .../ReplicationLogDiscoveryForwarderTest.java      |  22 +-
 .../replication/ReplicationLogGroupTest.java       | 655 ++++++++++++++++-----
 .../phoenix/replication/ReplicationLogTest.java    |  66 +++
 11 files changed, 784 insertions(+), 450 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index d23008b9fb..b699981c24 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -22,11 +22,13 @@ import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +41,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -48,6 +51,13 @@ import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFac
  * based on time or size, retries on file operations and the lifecycle of the 
underlying
  * LogFileWriter.
  * </p>
+ * <p>
+ * All rotation (scheduled round-boundary ticks, on-demand size, and error 
recovery) is handled by
+ * {@link LogRotationTask} which creates a new writer on a background thread 
and stages it in
+ * {@code pendingWriter}. The disruptor consumer thread drains the staged 
writer inside
+ * {@link #apply} before each attempt, swaps the pointer, replays any unsynced 
appends, and submits
+ * the old writer to {@code closeExecutor} for async close.
+ * </p>
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
     value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2", "MS_EXPOSE_REP" }, 
justification = "Intentional")
@@ -59,22 +69,24 @@ public class ReplicationLog {
   protected final long rotationSizeBytes;
   protected final int maxRotationRetries;
   protected final Compression.Algorithm compression;
-  protected final ReentrantLock lock = new ReentrantLock();
-  protected final int maxAttempts; // Configurable max attempts for sync
-  protected final long retryDelayMs; // Configurable delay between attempts
-  // Underlying file writer
+  protected final int maxAttempts;
+  protected final long retryDelayMs;
+  // Underlying file writer — only mutated by the disruptor consumer thread.
   protected volatile LogFileWriter currentWriter;
-  protected final AtomicLong lastRotationTime = new AtomicLong();
   protected final AtomicLong writerGeneration = new AtomicLong();
   protected final AtomicLong rotationFailures = new AtomicLong(0);
+  // Staged writer created by the background LogRotationTask, drained by 
checkAndReplaceWriter().
+  private final AtomicReference<LogFileWriter> pendingWriter = new 
AtomicReference<>();
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+  private final AtomicBoolean rotationRequested = new AtomicBoolean(false);
+  private final ExecutorService closeExecutor;
   protected ScheduledExecutorService rotationExecutor;
-  protected volatile boolean closed = false;
   // Manages the creation of the actual log file in the shard directory
   protected ReplicationShardDirectoryManager replicationShardDirectoryManager;
   // List of in-flight appends which are successful but haven't been synced yet
   private final List<Record> currentBatch = new ArrayList<>();
-  // Current version of the writer being used for file operations. It is 
needed for detecting
-  // when the writer changes because of rotation while we are in the middle of 
a write operation.
+  // Tracks the generation of the writer that currentBatch was appended to.
+  // Used to detect writer swaps and trigger replay of unsynced appends.
   private long generation;
 
   public ReplicationLog(ReplicationLogGroup logGroup,
@@ -85,8 +97,7 @@ public class ReplicationLog {
       ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_RETRIES) + 1;
     this.retryDelayMs = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_RETRY_DELAY_MS_KEY,
       ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
-    this.rotationTimeMs = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
-      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+    this.rotationTimeMs = shardManager.getReplicationRoundDurationSeconds() * 
1000L;
     long rotationSize = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
       ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
     double rotationSizePercent =
@@ -104,29 +115,18 @@ public class ReplicationLog {
       try {
         compression = 
Compression.getCompressionAlgorithmByName(compressionName);
       } catch (IllegalArgumentException e) {
-        LOG.warn("Unknown compression type " + compressionName + ", using 
NONE", e);
+        LOG.warn("Unknown compression type {}, using NONE", compressionName, 
e);
       }
     }
     this.compression = compression;
     this.replicationShardDirectoryManager = shardManager;
-  }
-
-  /** The reason for requesting a log rotation. */
-  protected enum RotationReason {
-    /** Rotation requested due to time threshold being exceeded. */
-    TIME,
-    /** Rotation requested due to size threshold being exceeded. */
-    SIZE,
-    /** Rotation requested due to an error condition. */
-    ERROR
+    this.closeExecutor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setDaemon(true)
+      .setNameFormat("Close-ReplicationLog-Writer-" + 
logGroup.getHAGroupName() + "-%d").build());
   }
 
   /** Initialize the writer. */
   public void init() throws IOException {
-    // Start time based rotation.
-    lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
     startRotationExecutor();
-    // Create the initial writer
     currentWriter = createNewWriter();
     generation = currentWriter.getGeneration();
   }
@@ -146,18 +146,25 @@ public class ReplicationLog {
   }
 
   protected void startRotationExecutor() {
-    long rotationCheckInterval = getRotationCheckInterval(rotationTimeMs);
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    long currentRoundStart = 
replicationShardDirectoryManager.getNearestRoundStartTimestamp(now);
+    long initialDelay = computeInitialDelay(now, currentRoundStart, 
rotationTimeMs);
+    startRotationExecutor(initialDelay);
+  }
+
+  protected void startRotationExecutor(long initialDelay) {
     rotationExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
       .setNameFormat("ReplicationLogRotation-" + logGroup.getHAGroupName() + 
"-%d").setDaemon(true)
       .build());
-    rotationExecutor.scheduleAtFixedRate(new LogRotationTask(), 
rotationCheckInterval,
-      rotationCheckInterval, TimeUnit.MILLISECONDS);
-    LOG.debug("Started rotation executor with interval {}ms", 
rotationCheckInterval);
+    rotationExecutor.scheduleAtFixedRate(new LogRotationTask(), initialDelay, 
rotationTimeMs,
+      TimeUnit.MILLISECONDS);
+    LOG.info("Started rotation executor with initial delay {}ms and interval 
{}ms", initialDelay,
+      rotationTimeMs);
   }
 
-  protected long getRotationCheckInterval(long rotationTimeMs) {
-    long interval = Math.max(10 * 1000L, Math.min(60 * 1000L, rotationTimeMs / 
10));
-    return interval;
+  @VisibleForTesting
+  static long computeInitialDelay(long now, long currentRoundStart, long 
rotationTimeMs) {
+    return currentRoundStart + rotationTimeMs - now;
   }
 
   protected void stopRotationExecutor() {
@@ -174,114 +181,79 @@ public class ReplicationLog {
     }
   }
 
-  /** Gets the current writer, rotating it if necessary based on size 
thresholds. */
-  protected LogFileWriter getWriter() throws IOException {
-    lock.lock();
-    try {
-      if (shouldRotate()) {
-        rotateLog(RotationReason.SIZE);
-      }
-      return currentWriter;
-    } finally {
-      lock.unlock();
-    }
-  }
-
   /**
-   * Checks if the current log file needs to be rotated based on time or size. 
Must be called under
-   * lock.
+   * Checks if the current log file needs to be rotated based on size.
    * @return true if rotation is needed, false otherwise.
    * @throws IOException If an error occurs checking the file size.
    */
-  protected boolean shouldRotate() throws IOException {
+  protected boolean shouldRotateForSize() throws IOException {
     if (currentWriter == null) {
       LOG.warn("Current writer is null, forcing rotation.");
       return true;
     }
-    // Check time threshold
-    long now = EnvironmentEdgeManager.currentTimeMillis();
-    long last = lastRotationTime.get();
-    if (now - last >= rotationTimeMs) {
-      LOG.debug("Rotating log file due to time threshold ({} ms elapsed, 
threshold {} ms)",
-        now - last, rotationTimeMs);
-      return true;
-    }
-
-    // Check size threshold (using actual file size for accuracy)
     long currentSize = currentWriter.getLength();
     if (currentSize >= rotationSizeBytes) {
-      LOG.debug("Rotating log file due to size threshold ({} bytes, threshold 
{} bytes)",
-        currentSize, rotationSizeBytes);
+      LOG.info("Rotating log file {} due to size threshold ({} bytes, 
threshold {} bytes)",
+        currentWriter, currentSize, rotationSizeBytes);
       return true;
     }
-
     return false;
   }
 
+  @VisibleForTesting
+  protected LogFileWriter getWriter() {
+    checkAndReplaceWriter(false);
+    return currentWriter;
+  }
+
   /**
-   * Closes the current log writer and opens a new one, updating rotation 
metrics.
-   * <p>
-   * This method handles the rotation of log files, which can be triggered by:
-   * <ul>
-   * <li>Time threshold exceeded (TIME)</li>
-   * <li>Size threshold exceeded (SIZE)</li>
-   * <li>Error condition requiring rotation (ERROR)</li>
-   * </ul>
-   * <p>
-   * The method implements retry logic for handling rotation failures. If 
rotation fails, it retries
-   * up to maxRotationRetries times. If the number of failures exceeds 
maxRotationRetries, an
-   * exception is thrown. Otherwise, it logs a warning and continues with the 
current writer.
-   * <p>
-   * The method is thread-safe and uses a lock to ensure atomic rotation 
operations.
-   * @param reason The reason for requesting log rotation
-   * @return The new LogFileWriter instance if rotation succeeded, or the 
current writer if rotation
-   *         failed
-   * @throws IOException if rotation fails after exceeding maxRotationRetries
+   * Drains any staged pendingWriter, swapping it in as currentWriter.
+   * @param asyncClose if true, old writer is submitted to closeExecutor for 
async close; if false,
+   *                   old writer is closed synchronously on the calling 
thread.
    */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"UL_UNRELEASED_LOCK",
-      justification = "False positive")
-  protected LogFileWriter rotateLog(RotationReason reason) throws IOException {
-    lock.lock();
-    try {
-      // Try to get the new writer first. If it fails we continue using the 
current writer.
-      // Increment the writer generation
-      LogFileWriter newWriter = createNewWriter();
-      LOG.debug("Created new writer: {}", newWriter);
-      // Close the current writer
-      closeWriter(currentWriter);
+  protected void checkAndReplaceWriter(boolean asyncClose) {
+    LogFileWriter newWriter = pendingWriter.getAndSet(null);
+    if (newWriter != null) {
+      LogFileWriter oldWriter = currentWriter;
       currentWriter = newWriter;
-      lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
-      rotationFailures.set(0);
-      logGroup.getMetrics().incrementRotationCount();
-      switch (reason) {
-        case TIME:
-          logGroup.getMetrics().incrementTimeBasedRotationCount();
-          break;
-        case SIZE:
-          logGroup.getMetrics().incrementSizeBasedRotationCount();
-          break;
-        case ERROR:
-          logGroup.getMetrics().incrementErrorBasedRotationCount();
-          break;
+      LOG.info("Swapped writer from {} to {}, asyncClose={}", oldWriter, 
newWriter, asyncClose);
+      if (asyncClose) {
+        submitClose(oldWriter);
+      } else {
+        closeWriter(oldWriter);
       }
-    } catch (IOException e) {
-      // If we fail to rotate the log, we increment the failure counter. If we 
have exceeded
-      // the maximum number of retries, we close the log and throw the 
exception. Otherwise
-      // we log a warning and continue.
-      logGroup.getMetrics().incrementRotationFailureCount();
-      long numFailures = rotationFailures.getAndIncrement();
-      if (numFailures >= maxRotationRetries) {
-        LOG.warn("Failed to rotate log (attempt {}/{}), closing log", 
numFailures,
-          maxRotationRetries, e);
-        closeOnError();
-        throw e;
+    }
+  }
+
+  /**
+   * Submits an on-demand {@link LogRotationTask} to the executor. The 
compareAndSet avoids
+   * submitting duplicate tasks — if the flag is already set, a task is 
already queued.
+   */
+  private void requestRotation() {
+    if (rotationRequested.compareAndSet(false, true)) {
+      try {
+        rotationExecutor.execute(new LogRotationTask());
+      } catch (java.util.concurrent.RejectedExecutionException e) {
+        LOG.info("Rotation executor shut down, skipping on-demand rotation", 
e);
+        rotationRequested.set(false);
       }
-      LOG.warn("Failed to rotate log (attempt {}/{}), retrying...", 
numFailures, maxRotationRetries,
-        e);
-    } finally {
-      lock.unlock();
     }
-    return currentWriter;
+  }
+
+  /**
+   * Requests an on-demand rotation if the current writer exceeds the size 
threshold.
+   */
+  private void requestRotationIfNeeded() throws IOException {
+    if (shouldRotateForSize()) {
+      requestRotation();
+    }
+  }
+
+  private void submitClose(LogFileWriter writer) {
+    if (writer == null) {
+      return;
+    }
+    closeExecutor.execute(() -> closeWriter(writer));
   }
 
   /** Closes the given writer, logging any errors that occur during close. */
@@ -289,12 +261,11 @@ public class ReplicationLog {
     if (writer == null) {
       return;
     }
-    LOG.debug("Closing writer: {}", writer);
+    LOG.info("Closing writer: {}", writer);
     try {
       writer.close();
     } catch (IOException e) {
-      // For now, just log and continue
-      LOG.error("Error closing log writer: " + writer, e);
+      LOG.error("Error closing log writer: {}", writer, e);
     }
   }
 
@@ -303,49 +274,55 @@ public class ReplicationLog {
    * @return true if closed, false otherwise
    */
   public boolean isClosed() {
-    return closed;
+    return closed.get();
   }
 
   private interface Action {
     void action(LogFileWriter writer) throws IOException;
   }
 
+  private void replayCurrentBatch() throws IOException {
+    if (currentBatch.isEmpty()) {
+      return;
+    }
+    LOG.info("Replaying {} unsynced records into new writer {}", 
currentBatch.size(),
+      currentWriter);
+    for (Record r : currentBatch) {
+      currentWriter.append(r.tableName, r.commitId, r.mutation);
+    }
+  }
+
   private void apply(Action action) throws IOException {
-    LogFileWriter writer = getWriter();
     for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+      checkAndReplaceWriter(true);
       if (isClosed()) {
         throw new IOException("Closed");
       }
       try {
-        if (writer.getGeneration() > generation) {
-          generation = writer.getGeneration();
-          // If the writer has been rotated, we need to replay the current 
batch of
-          // in-flight appends into the new writer.
-          if (!currentBatch.isEmpty()) {
-            LOG.trace("Writer has been rotated, replaying in-flight batch");
-            for (Record r : currentBatch) {
-              writer.append(r.tableName, r.commitId, r.mutation);
-            }
-          }
+        if (currentWriter.getGeneration() > generation) {
+          replayCurrentBatch();
+          generation = currentWriter.getGeneration();
         }
-        action.action(writer);
+        action.action(currentWriter);
+        requestRotationIfNeeded();
         break;
       } catch (IOException e) {
-        // IO exception, force a rotation.
-        LOG.debug("Attempt " + attempt + "/" + maxAttempts + " failed", e);
+        LOG.debug("Attempt {}/{} failed", attempt, maxAttempts, e);
         if (attempt == maxAttempts) {
-          // TODO: Add log
           closeOnError();
           throw e;
         }
-        // Add delay before retrying to prevent tight loops
+        // First failure retries on the same writer (transient). Second failure
+        // requests a new writer to recover from non-transient stream errors.
+        if (attempt > 1) {
+          requestRotation();
+        }
         try {
           Thread.sleep(retryDelayMs);
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
           throw new InterruptedIOException("Interrupted during retry delay");
         }
-        writer = rotateLog(RotationReason.ERROR);
       }
     }
   }
@@ -361,8 +338,7 @@ public class ReplicationLog {
   }
 
   protected void sync() throws IOException {
-    apply(writer -> writer.sync());
-    // Sync completed, clear the list of in-flight appends.
+    apply(LogFileWriter::sync);
     currentBatch.clear();
   }
 
@@ -381,79 +357,81 @@ public class ReplicationLog {
    * attempted on a log that has encountered a critical error.
    */
   protected void closeOnError() {
-    lock.lock();
-    try {
-      if (closed) {
-        return;
-      }
-      closed = true;
-    } finally {
-      lock.unlock();
+    if (!closed.compareAndSet(false, true)) {
+      return;
     }
-    // Stop the time based rotation check.
     stopRotationExecutor();
-    // We expect a final sync will not work. Just close the inner writer.
+    closeExecutor.shutdownNow();
+    LogFileWriter staged = pendingWriter.getAndSet(null);
+    if (staged != null) {
+      closeWriter(staged);
+    }
     closeWriter(currentWriter);
   }
 
   /** Closes the log. */
   public void close() {
-    lock.lock();
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+    stopRotationExecutor();
+    closeExecutor.shutdown();
     try {
-      if (closed) {
-        return;
+      if (!closeExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        closeExecutor.shutdownNow();
       }
-      closed = true;
-    } finally {
-      lock.unlock();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      closeExecutor.shutdownNow();
+    }
+    LogFileWriter staged = pendingWriter.getAndSet(null);
+    if (staged != null) {
+      closeWriter(staged);
     }
-    // Stop the time based rotation check.
-    stopRotationExecutor();
-    // We must for the disruptor before closing the current writer.
     closeWriter(currentWriter);
   }
 
+  @VisibleForTesting
+  protected void forceRotation() {
+    new LogRotationTask().run();
+    checkAndReplaceWriter(false);
+  }
+
   protected FileSystem getFileSystem(URI uri) throws IOException {
     return FileSystem.get(uri, logGroup.getConfiguration());
   }
 
-  /** Implements time based rotation independent of in-line checking. */
+  /**
+   * Creates a new writer on a background thread and stages it in {@code 
pendingWriter} for the
+   * consumer thread to drain inside {@link #apply}. Invoked both by the 
scheduled rotation executor
+   * at round boundaries and on-demand via {@link #requestRotation()}.
+   */
   protected class LogRotationTask implements Runnable {
     @Override
     public void run() {
-      if (closed) {
+      if (closed.get()) {
         return;
       }
-      // Use tryLock with a timeout to avoid blocking indefinitely if another 
thread holds
-      // the lock for an unexpectedly long time (e.g., during a problematic 
rotation).
-      boolean acquired = false;
+      rotationRequested.compareAndSet(true, false);
+
       try {
-        // Wait a short time for the lock
-        acquired = lock.tryLock(1, TimeUnit.SECONDS);
-        if (acquired) {
-          // Check only the time condition here, size is handled by getWriter
-          long now = EnvironmentEdgeManager.currentTimeMillis();
-          long last = lastRotationTime.get();
-          if (!closed && now - last >= rotationTimeMs) {
-            LOG.debug("Time based rotation needed ({} ms elapsed, threshold {} 
ms).", now - last,
-              rotationTimeMs);
-            try {
-              rotateLog(RotationReason.TIME); // rotateLog updates 
lastRotationTime
-            } catch (IOException e) {
-              LOG.error("Failed to rotate log, currentWriter is {}", 
currentWriter, e);
-              // More robust error handling goes here once the 
store-and-forward
-              // fallback is implemented. For now we just log the error and 
continue.
-            }
-          }
-        } else {
-          LOG.warn("LogRotationTask could not acquire lock, skipping check 
this time.");
+        LogFileWriter newWriter = createNewWriter();
+        LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
+        if (undrained != null) {
+          closeWriter(undrained);
         }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt(); // Preserve interrupt status
-        LOG.warn("LogRotationTask interrupted while trying to acquire lock.");
-      } finally {
-        if (acquired) {
-          lock.unlock();
+        rotationFailures.set(0);
+        logGroup.getMetrics().incrementRotationCount();
+      } catch (IOException e) {
+        logGroup.getMetrics().incrementRotationFailureCount();
+        long numFailures = rotationFailures.incrementAndGet();
+        if (numFailures >= maxRotationRetries) {
+          LOG.error("Too many rotation failures ({}/{}), closing log", 
numFailures,
+            maxRotationRetries, e);
+          closeOnError();
+        } else {
+          LOG.info("Failed to create new writer for rotation (attempt {}/{}), 
retrying...",
+            numFailures, maxRotationRetries, e);
         }
       }
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index 806df5f155..c69292a9a1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -412,6 +412,7 @@ public class ReplicationLogGroup {
    * @throws IOException if initialization fails
    */
   protected void init() throws IOException {
+    LOG.info("Initializing ReplicationLogGroup {}", haGroupName);
     Optional<HAGroupStoreRecord> haRecord = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
     if (!haRecord.isPresent()) {
       String message =
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index be94364a80..2555f17225 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -211,9 +211,8 @@ public class LogFileFormatWriter implements Closeable {
 
   @Override
   public String toString() {
-    return "LogFileFormatWriter [writerContext=" + context + ", 
currentBlockUncompressedBytes="
-      + currentBlockBytes + ", recordCount=" + recordCount + ", blockCount=" + 
blockCount
-      + ", blocksStartOffset=" + blocksStartOffset + "]";
+    return "LogFileFormatWriter [writerContext=" + context + ", recordCount=" 
+ recordCount
+      + ", blockCount=" + blockCount + ", blocksStartOffset=" + 
blocksStartOffset + "]";
   }
 
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 0c883a4469..923fc0f7bf 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -35,7 +35,7 @@ public class LogFileWriter implements LogFile.Writer {
 
   private LogFileWriterContext context;
   private LogFileFormatWriter writer;
-  private boolean closed = false;
+  private volatile boolean closed = false;
   /**
    * A monotonically increasing sequence number that identifies this writer 
instance, used to detect
    * log file rotations and ensure proper handling of in-flight operations. 
Higher layers will get a
@@ -124,8 +124,8 @@ public class LogFileWriter implements LogFile.Writer {
 
   @Override
   public String toString() {
-    return "LogFileWriter [writerContext=" + context + ", formatWriter=" + 
writer + ", closed="
-      + closed + ", generation=" + generation + "]";
+    return "LogFileWriter [formatWriter=" + writer + ", closed=" + closed + ", 
generation="
+      + generation + "]";
   }
 
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index 798c048136..b3cc095048 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -27,15 +27,6 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
   String METRICS_DESCRIPTION = "Metrics about Replication Log Operations for 
an HA Group";
   String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
 
-  String TIME_BASED_ROTATION_COUNT = "timeBasedRotationCount";
-  String TIME_BASED_ROTATION_COUNT_DESC = "Number of time-based log rotations";
-
-  String SIZE_BASED_ROTATION_COUNT = "sizeBasedRotationCount";
-  String SIZE_BASED_ROTATION_COUNT_DESC = "Number of size-based log rotations";
-
-  String ERROR_BASED_ROTATION_COUNT = "errorBasedRotationCount";
-  String ERROR_BASED_ROTATION_COUNT_DESC = "Number of times rotateLog was 
called due to errors";
-
   String ROTATION_COUNT = "rotationCount";
   String ROTATION_COUNT_DESC = "Total number of times rotateLog was called";
 
@@ -54,24 +45,6 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
   String RING_BUFFER_TIME = "ringBufferTime";
   String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
 
-  /**
-   * Increments the counter for time-based log rotations. This counter tracks 
the number of times
-   * the log was rotated due to time threshold.
-   */
-  void incrementTimeBasedRotationCount();
-
-  /**
-   * Increments the counter for size-based log rotations. This counter tracks 
the number of times
-   * the log was rotated due to size threshold.
-   */
-  void incrementSizeBasedRotationCount();
-
-  /**
-   * Increments the counter for error-based log rotations. This counter tracks 
the number of times
-   * the log was rotated due to errors.
-   */
-  void incrementErrorBasedRotationCount();
-
   /**
    * Increments the counter for total log rotations. This counter tracks the 
total number of times
    * the log was rotated, regardless of reason.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index 7b3bb5789c..718f6ac63d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -26,9 +26,6 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
 public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl
   implements MetricsReplicationLogGroupSource {
 
-  private final MutableFastCounter timeBasedRotationCount;
-  private final MutableFastCounter sizeBasedRotationCount;
-  private final MutableFastCounter errorBasedRotationCount;
   private final MutableFastCounter rotationCount;
   private final MutableFastCounter rotationFailuresCount;
   private final MutableHistogram appendTime;
@@ -44,12 +41,6 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
     String metricsContext, String metricsJmxContext, String haGroupName) {
     super(metricsName, metricsDescription, metricsContext,
       metricsJmxContext + ",haGroup=" + haGroupName);
-    timeBasedRotationCount = 
getMetricsRegistry().newCounter(TIME_BASED_ROTATION_COUNT,
-      TIME_BASED_ROTATION_COUNT_DESC, 0L);
-    sizeBasedRotationCount = 
getMetricsRegistry().newCounter(SIZE_BASED_ROTATION_COUNT,
-      SIZE_BASED_ROTATION_COUNT_DESC, 0L);
-    errorBasedRotationCount = 
getMetricsRegistry().newCounter(ERROR_BASED_ROTATION_COUNT,
-      ERROR_BASED_ROTATION_COUNT_DESC, 0L);
     rotationCount = getMetricsRegistry().newCounter(ROTATION_COUNT, 
ROTATION_COUNT_DESC, 0L);
     rotationFailuresCount =
       getMetricsRegistry().newCounter(ROTATION_FAILURES, 
ROTATION_FAILURES_DESC, 0L);
@@ -64,21 +55,6 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
     DefaultMetricsSystem.instance().unregisterSource(metricsJmxContext);
   }
 
-  @Override
-  public void incrementTimeBasedRotationCount() {
-    timeBasedRotationCount.incr();
-  }
-
-  @Override
-  public void incrementSizeBasedRotationCount() {
-    sizeBasedRotationCount.incr();
-  }
-
-  @Override
-  public void incrementErrorBasedRotationCount() {
-    errorBasedRotationCount.incr();
-  }
-
   @Override
   public void incrementRotationCount() {
     rotationCount.incr();
@@ -111,10 +87,8 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
 
   @Override
   public ReplicationLogMetricValues getCurrentMetricValues() {
-    return new ReplicationLogMetricValues(timeBasedRotationCount.value(),
-      sizeBasedRotationCount.value(), errorBasedRotationCount.value(), 
rotationCount.value(),
-      rotationFailuresCount.value(), appendTime.getMax(), syncTime.getMax(), 
rotationTime.getMax(),
-      ringBufferTime.getMax());
+    return new ReplicationLogMetricValues(rotationCount.value(), 
rotationFailuresCount.value(),
+      appendTime.getMax(), syncTime.getMax(), rotationTime.getMax(), 
ringBufferTime.getMax());
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
index 23ee864747..ef64552ece 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -20,9 +20,6 @@ package org.apache.phoenix.replication.metrics;
 /** Class to hold the values of all metrics tracked by the ReplicationLog 
metrics source. */
 public class ReplicationLogMetricValues {
 
-  private final long timeBasedRotationCount;
-  private final long sizeBasedRotationCount;
-  private final long errorBasedRotationCount;
   private final long rotationCount;
   private final long rotationFailuresCount;
   private final long appendTime;
@@ -30,12 +27,8 @@ public class ReplicationLogMetricValues {
   private final long rotationTime;
   private final long ringBufferTime;
 
-  public ReplicationLogMetricValues(long timeBasedRotationCount, long 
sizeBasedRotationCount,
-    long errorBasedRotationCount, long rotationCount, long 
rotationFailuresCount, long appendTime,
+  public ReplicationLogMetricValues(long rotationCount, long 
rotationFailuresCount, long appendTime,
     long syncTime, long rotationTime, long ringBufferTime) {
-    this.timeBasedRotationCount = timeBasedRotationCount;
-    this.sizeBasedRotationCount = sizeBasedRotationCount;
-    this.errorBasedRotationCount = errorBasedRotationCount;
     this.rotationCount = rotationCount;
     this.rotationFailuresCount = rotationFailuresCount;
     this.appendTime = appendTime;
@@ -44,18 +37,6 @@ public class ReplicationLogMetricValues {
     this.ringBufferTime = ringBufferTime;
   }
 
-  public long getTimeBasedRotationCount() {
-    return timeBasedRotationCount;
-  }
-
-  public long getSizeBasedRotationCount() {
-    return sizeBasedRotationCount;
-  }
-
-  public long getErrorBasedRotationCount() {
-    return errorBasedRotationCount;
-  }
-
   public long getRotationCount() {
     return rotationCount;
   }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index eca484d5f9..5ff9fc2d41 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -73,9 +73,8 @@ public class ReplicationLogBaseTest {
 
   static final int TEST_RINGBUFFER_SIZE = 32;
   static final int TEST_SYNC_TIMEOUT = 1000;
-  static final int TEST_ROTATION_TIME = 5000;
   static final int TEST_ROTATION_SIZE_BYTES = 10 * 1024;
-  static final int TEST_REPLICATION_ROUND_DURATION_SECONDS = 20;
+  static final int TEST_REPLICATION_ROUND_DURATION_SECONDS = 60;
 
   protected ReplicationLogBaseTest() {
     this(HAGroupState.ACTIVE_IN_SYNC);
@@ -85,8 +84,11 @@ public class ReplicationLogBaseTest {
     this.initialState = initialState;
   }
 
+  protected void overrideConf(Configuration conf) {
+  }
+
   @Before
-  public void setUpBase() throws IOException {
+  public void setUpBase() throws Exception {
     MockitoAnnotations.initMocks(this);
     haGroupName = name.getMethodName();
     conf = HBaseConfiguration.create();
@@ -98,8 +100,6 @@ public class ReplicationLogBaseTest {
     conf.setInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
TEST_RINGBUFFER_SIZE);
     // Set a short sync timeout for testing
     conf.setLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
TEST_SYNC_TIMEOUT);
-    // Set rotation time to 10 seconds
-    conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
TEST_ROTATION_TIME);
     // Small size threshold for testing
     conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
       TEST_ROTATION_SIZE_BYTES);
@@ -107,6 +107,7 @@ public class ReplicationLogBaseTest {
     conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
       TEST_REPLICATION_ROUND_DURATION_SECONDS);
     conf.setDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, 0.0);
+    overrideConf(conf);
 
     // initialize the group store record
     storeRecord = initHAGroupStoreRecord();
@@ -123,6 +124,24 @@ public class ReplicationLogBaseTest {
     }
   }
 
+  /**
+   * Closes the current logGroup and creates a fresh one using the current 
{@code conf}. Use this
+   * when a test needs to override configuration (e.g. round duration, 
retries, size threshold)
+   * after {@link #setUpBase()} has already created the default logGroup.
+   */
+  protected void recreateLogGroup() throws Exception {
+    if (logGroup != null) {
+      logGroup.close();
+    }
+    logGroup = new TestableLogGroup(conf, serverName, haGroupName, 
haGroupStoreManager);
+    logGroup.init();
+  }
+
+  protected static void waitForRotationTick(int roundDurationSeconds) throws 
InterruptedException {
+    Thread.sleep((long) (roundDurationSeconds * 1000 * 1.25));
+    LOG.info("Waking up after waiting for rotation tick");
+  }
+
   private HAGroupStoreRecord initHAGroupStoreRecord() {
     return new HAGroupStoreRecord(null, haGroupName, initialState, 0,
       HighAvailabilityPolicy.FAILOVER.toString(), "peerZKUrl", "clusterUrl", 
"peerClusterUrl",
@@ -149,7 +168,9 @@ public class ReplicationLogBaseTest {
   }
 
   /**
-   * Testable version of ReplicationLog that allows spying on the log
+   * Testable version of ReplicationLog that allows spying on the log. 
Overrides
+   * startRotationExecutor to always use a full round as initial delay so that 
the rotation task
+   * never fires unexpectedly when a test happens to start near a round 
boundary.
    */
   static class TestableLog extends ReplicationLog {
 
@@ -158,9 +179,17 @@ public class ReplicationLogBaseTest {
       super(logGroup, shardManager);
     }
 
+    @Override
+    protected void startRotationExecutor() {
+      // Use a full round as the initial delay so the rotation task never 
fires early when the
+      // test happens to start close to a round boundary (e.g. initialDelay of 
852ms on a 60s round)
+      super.startRotationExecutor(rotationTimeMs);
+    }
+
     @Override
     protected LogFileWriter createNewWriter() throws IOException {
       LogFileWriter writer = super.createNewWriter();
+      LOG.info("createNewWriter called, generation={}", 
writer.getGeneration());
       return spy(writer);
     }
   }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
index 765594beb2..770956b70d 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication;
 
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC;
+import static 
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -33,11 +34,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 import org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode;
 import org.apache.phoenix.replication.log.LogFileTestUtil;
-import org.junit.After;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,16 +57,17 @@ public class ReplicationLogDiscoveryForwarderTest extends 
ReplicationLogBaseTest
     super(HAGroupState.ACTIVE_NOT_IN_SYNC);
   }
 
+  @Override
+  protected void overrideConf(Configuration conf) {
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
+  }
+
   @Before
   public void setUp() throws IOException {
     ReplicationMode mode = logGroup.getMode();
     Assert.assertTrue(mode.equals(STORE_AND_FORWARD));
   }
 
-  @After
-  public void tearDown() throws IOException {
-  }
-
   @Test
   public void testLogForwardingAndTransitionBackToSyncMode() throws Exception {
     final String tableName = "TESTTBL";
@@ -143,7 +146,6 @@ public class ReplicationLogDiscoveryForwarderTest extends 
ReplicationLogBaseTest
   @Test
   public void testSyncModeUpdateWaitTime() throws Exception {
     final long[] waitTime = { 8L };
-    int roundDurationSeconds = 
logGroup.getLocalShardManager().getReplicationRoundDurationSeconds();
 
     doAnswer(new Answer<Object>() {
       @Override
@@ -165,9 +167,11 @@ public class ReplicationLogDiscoveryForwarderTest extends 
ReplicationLogBaseTest
         return ret;
       }
     }).when(haGroupStoreManager).setHAGroupStatusToSync(haGroupName);
-    Thread.sleep(roundDurationSeconds * 3 * 1000);
-    LOG.info("Coming out of sleep");
-    // we should have switched back to the SYNC mode
+
+    long deadline = EnvironmentEdgeManager.currentTimeMillis() + 120_000;
+    while (logGroup.getMode() != SYNC && 
EnvironmentEdgeManager.currentTimeMillis() < deadline) {
+      Thread.sleep(500);
+    }
     assertEquals(SYNC, logGroup.getMode());
   }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 39109364c0..3bd389a0d5 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication;
 
 import static java.lang.Thread.sleep;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
+import static 
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -40,16 +41,13 @@ import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.phoenix.replication.ReplicationLog.RotationReason;
 import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -109,8 +107,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
   }
 
   /**
-   * Tests the behavior when a sync operation fails. Verifies that the system 
properly handles sync
-   * failures by rolling to a new writer and retrying the operation.
+   * Tests the behavior when a sync operation fails transiently. Verifies that 
the system retries
+   * with the same writer and succeeds on the next attempt.
    */
   @Test
   public void testSyncFailureAndRetry() throws Exception {
@@ -118,27 +116,25 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     final long commitId = 1L;
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
+    ReplicationLog activeLog = logGroup.getActiveLog();
     // Get the inner writer
-    LogFileWriter writerBeforeRoll = logGroup.getActiveLog().getWriter();
-    assertNotNull("Initial writer should not be null", writerBeforeRoll);
+    LogFileWriter writer = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", writer);
 
-    // Configure writerBeforeRoll to fail on the first sync call
-    doThrow(new IOException("Simulated sync 
failure")).when(writerBeforeRoll).sync();
+    // Keep returning the same writer so a rotation tick can't swap in a 
different one
+    doAnswer(invocation -> writer).when(activeLog).createNewWriter();
+
+    // Configure writer to fail on the first sync call, then succeed
+    doThrow(new IOException("Simulated sync 
failure")).doCallRealMethod().when(writer).sync();
 
     // Append data
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
-    // Get the inner writer we rolled to.
-    LogFileWriter writerAfterRoll = logGroup.getActiveLog().getWriter();
-    assertNotNull("Initial writer should not be null", writerBeforeRoll);
-
-    // Verify the sequence: append, sync (fail), rotate, append (retry), sync 
(succeed)
-    InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
-    inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
-    inOrder.verify(writerBeforeRoll, times(1)).sync(); // Failed
-    inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // Replay
-    inOrder.verify(writerAfterRoll, times(1)).sync(); // Succeeded
+    // Verify the sequence: append, sync (fail), sync (succeed on retry with 
same writer)
+    InOrder inOrder = Mockito.inOrder(writer);
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    inOrder.verify(writer, times(2)).sync(); // First fails, second succeeds
   }
 
   /**
@@ -346,13 +342,18 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
   /**
    * Tests time-based log rotation. Verifies that the log file is rotated 
after the configured
-   * rotation time period and that operations continue correctly with the new 
log file.
+   * rotation time period and that operations continue correctly with the new 
log file. The writer
+   * swap happens pre-action in apply(), so the second append+sync go directly 
to the new writer.
    */
   @Test
   public void testTimeBasedRotation() throws Exception {
     final String tableName = "TBLTBR";
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
     final long commitId = 1L;
+    final int roundDurationSeconds = 5;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
 
     // Get the initial writer
     LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
@@ -362,42 +363,31 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
-    // Wait for rotation time to elapse
-    sleep((long) (TEST_ROTATION_TIME * 1.25));
+    // Wait for rotation time to elapse so LogRotationTask stages a 
pendingWriter
+    waitForRotationTick(roundDurationSeconds);
 
-    // Append more data to trigger rotation check
+    // This append's apply() drains pendingWriter before the append, so it 
goes to new writer
     logGroup.append(tableName, commitId + 1, put);
     logGroup.sync();
 
-    // Get the new writer after rotation
+    // Verify we have a new writer
     LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
     assertNotNull("New writer should not be null", writerAfterRotation);
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
     // Verify the sequence of operations
     InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
-    inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // First
-                                                                               
                  // append
-                                                                               
                  // to
-                                                                               
                  // initial
-                                                                               
                  // writer
+    inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
     inOrder.verify(writerBeforeRotation, times(1)).sync();
-    inOrder.verify(writerAfterRotation, times(0)).append(eq(tableName), 
eq(commitId), eq(put)); // First
-                                                                               
                 // append
-                                                                               
                 // is
-                                                                               
                 // not
-                                                                               
                 // replayed
-    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 1), eq(put)); // Second
-                                                                               
                     // append
-                                                                               
                     // to
-                                                                               
                     // new
-                                                                               
                     // writer
+    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 1), eq(put));
     inOrder.verify(writerAfterRotation, times(1)).sync();
   }
 
   /**
    * Tests size-based log rotation. Verifies that the log file is rotated when 
it exceeds the
-   * configured size threshold and that operations continue correctly with the 
new log file.
+   * configured size threshold and that operations continue correctly with the 
new log file. After
+   * the first sync, requestRotationIfNeeded submits an on-demand 
LogRotationTask which creates the
+   * new writer immediately on the executor thread.
    */
   @Test
   public void testSizeBasedRotation() throws Exception {
@@ -405,34 +395,31 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
     long commitId = 1L;
 
-    LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter writerBeforeRotation = activeLog.getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRotation);
 
     // Append enough data so that we exceed the size threshold.
     for (int i = 0; i < 100; i++) {
       logGroup.append(tableName, commitId++, put);
     }
-    logGroup.sync(); // Should trigger a sized based rotation
+    // Sync: data goes to old writer. requestRotationIfNeeded submits 
on-demand rotation task.
+    logGroup.sync();
 
-    // Get the new writer after the expected rotation.
-    LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
-    assertNotNull("New writer should not be null", writerAfterRotation);
-    assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
+    // Wait for the on-demand rotation task to create a new writer on the 
background thread
+    Thread.sleep(100);
 
-    // Append one more mutation to verify we're using the new writer.
+    // Next append's apply() drains pending writer → goes to new writer
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
-    // Verify the sequence of operations
-    InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
-    // Verify all appends before rotation went to the first writer.
-    for (int i = 1; i < commitId; i++) {
-      inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq((long) i), eq(put));
-    }
-    inOrder.verify(writerBeforeRotation, times(1)).sync();
-    // Verify the final append went to the new writer.
-    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
-    inOrder.verify(writerAfterRotation, times(1)).sync();
+    LogFileWriter writerAfterRotation = activeLog.getWriter();
+    assertNotNull("New writer should not be null", writerAfterRotation);
+    assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
+
+    // Verify the final append went to the new writer
+    verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    verify(writerAfterRotation, times(1)).sync();
   }
 
   /**
@@ -480,13 +467,18 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
   /**
    * Tests the automatic rotation task. Verifies that the background rotation 
task correctly rotates
-   * log files based on the configured rotation time.
+   * log files based on the configured rotation time. The writer swap happens 
pre-action in apply(),
+   * and the old writer is closed asynchronously via closeExecutor.
    */
   @Test
   public void testRotationTask() throws Exception {
     final String tableName = "TBLRT";
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
     long commitId = 1L;
+    final int roundDurationSeconds = 5;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
 
     LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRotation);
@@ -494,34 +486,31 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // Append some data and wait for the rotation time to elapse plus a small 
buffer.
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
-    sleep((long) (TEST_ROTATION_TIME * 1.25));
+    waitForRotationTick(roundDurationSeconds);
+
+    // The LogRotationTask has staged a pendingWriter. Next append's apply() 
drains it.
+    logGroup.append(tableName, commitId + 1, put);
+    logGroup.sync();
 
     // Get the new writer after the rotation.
     LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
     assertNotNull("New writer should not be null", writerAfterRotation);
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
-    // Verify first append and sync went to initial writer
+    // Verify first batch went to initial writer
     verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L), 
eq(put));
     verify(writerBeforeRotation, times(1)).sync();
-    // Verify the initial writer was closed
-    verify(writerBeforeRotation, times(1)).close();
+    // Verify second batch went to new writer (swap happened before append)
+    verify(writerAfterRotation, times(1)).append(eq(tableName), eq(commitId + 
1), eq(put));
+    verify(writerAfterRotation, times(1)).sync();
+    // Verify the initial writer was closed asynchronously
+    verify(writerBeforeRotation, timeout(5000).times(1)).close();
   }
 
   /**
-   * Tests behavior when log rotation fails temporarily but eventually 
succeeds. Verifies that:
-   * <ul>
-   * <li>The system can handle temporary rotation failures</li>
-   * <li>After failing twice, the third rotation attempt succeeds</li>
-   * <li>Operations continue correctly with the new writer after successful 
rotation</li>
-   * <li>The metrics for rotation failures are properly tracked</li>
-   * <li>Operations can continue with the current writer while rotation 
attempts are failing</li>
-   * </ul>
-   * <p>
-   * This test simulates a scenario where the first two rotation attempts fail 
(e.g., due to
-   * temporary HDFS issues) but the third attempt succeeds. This is a common 
real-world scenario
-   * where transient failures occur but the system eventually recovers. During 
the failed rotation
-   * attempts, the system should continue to operate normally with the current 
writer.
+   * Tests behavior when log rotation fails temporarily but eventually 
succeeds. The rotation task
+   * fails to create a new writer on the first attempt, but succeeds on the 
second. During the
+   * failure, operations continue normally with the current writer.
    */
   @Test
   public void testFailedRotation() throws Exception {
@@ -531,7 +520,6 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     ReplicationLog activeLog = logGroup.getActiveLog();
 
-    // Get the initial writer
     LogFileWriter initialWriter = activeLog.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
@@ -548,38 +536,37 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
-    // Rotate the log.
-    LogFileWriter writerAfterFailedRotate = 
activeLog.rotateLog(RotationReason.TIME);
-    assertEquals("Should still be using the initial writer", initialWriter,
-      writerAfterFailedRotate);
+    // Force rotation — first attempt will fail
+    activeLog.forceRotation();
 
-    // While rotation is failing, verify we can continue to use the current 
writer.
+    // Verify we can still use the current writer after the failed rotation
     logGroup.append(tableName, commitId + 1, put);
     logGroup.sync();
+    assertEquals("Should still be using the initial writer", initialWriter, 
activeLog.getWriter());
 
-    LogFileWriter writerAfterRotate = activeLog.rotateLog(RotationReason.TIME);
-    assertNotEquals("Should be using a new writer", initialWriter, 
writerAfterRotate);
+    // Force rotation again — this one should succeed
+    activeLog.forceRotation();
 
-    // Try to append more data. This should work with the new writer after 
successful rotation.
+    // Trigger swap by appending and syncing
     logGroup.append(tableName, commitId + 2, put);
     logGroup.sync();
 
+    LogFileWriter writerAfterRotate = activeLog.getWriter();
+    assertNotEquals("Should be using a new writer", initialWriter, 
writerAfterRotate);
+
     // Verify operations went to the writers in the correct order
     InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
-    // First append and sync on initial writer.
     inOrder.verify(initialWriter).append(eq(tableName), eq(commitId), eq(put));
     inOrder.verify(initialWriter).sync();
-    // Second append and sync on initial writer after failed rotation.
     inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1), 
eq(put));
     inOrder.verify(initialWriter).sync();
-    // Final append and sync on new writer after successful rotation.
     inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2), 
eq(put));
     inOrder.verify(writerAfterRotate).sync();
   }
 
   /**
-   * This test simulates a scenario where rotation consistently fails and 
verifies that the system
-   * properly propagates an exception after exhausting all retry attempts.
+   * Tests that too many consecutive rotation failures cause the log to close 
via closeOnError().
+   * The LogRotationTask tracks failures across attempts and fail-stops after 
maxRotationRetries.
    */
   @Test
   public void testTooManyRotationFailures() throws Exception {
@@ -589,7 +576,6 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     ReplicationLog activeLog = logGroup.getActiveLog();
 
-    // Get the initial writer
     LogFileWriter initialWriter = activeLog.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
@@ -601,25 +587,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
-    // Try to rotate the log multiple times until we exceed the retry limit
+    // Force rotation repeatedly until it exceeds maxRotationRetries and calls 
closeOnError
     for (int i = 0; i <= 
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
-      try {
-        activeLog.rotateLog(RotationReason.TIME);
-      } catch (IOException e) {
-        if (i < ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
-          // Not the last attempt yet, continue
-          continue;
-        }
-        // This was the last attempt, verify the exception
-        assertTrue("Expected IOException", e instanceof IOException);
-        assertTrue("Expected our mocked failure cause",
-          e.getMessage().contains("Simulated failure"));
-
-      }
+      activeLog.forceRotation();
     }
 
-    // Verify subsequent operations will fail because the log is closed and 
then trigger
-    // a mode switch to STORE_AND_FORWARD
+    assertTrue("Log should be closed after too many rotation failures", 
activeLog.isClosed());
+
+    // Verify subsequent operations trigger a mode switch to STORE_AND_FORWARD
     logGroup.append(tableName, commitId + 1, put);
     logGroup.sync();
     assertEquals(STORE_AND_FORWARD, logGroup.getMode());
@@ -682,19 +657,23 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     LogFileWriter initialWriter = activeLog.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
-    // Configure initial writer to fail on sync
+    // Configure initial writer to always fail on sync
     doThrow(new IOException("Simulated sync 
failure")).when(initialWriter).sync();
 
-    // createNewWriter should keep returning the bad writer
-    doAnswer(invocation -> initialWriter).when(activeLog).createNewWriter();
+    // Make any new writers also fail on sync so a rotation can't rescue the 
retry loop
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      doThrow(new IOException("Simulated sync failure")).when(w).sync();
+      return w;
+    }).when(activeLog).createNewWriter();
 
     // Append data
     logGroup.append(tableName, commitId, put);
     // Try to sync. Should fail after exhausting retries and then switch to 
STORE_AND_FORWARD
     logGroup.sync();
 
-    // Each retry creates a new writer, so that is at least 1 create + 4 
retries.
-    verify(activeLog, atLeast(5)).createNewWriter();
+    // All retries use the same writer — verify sync was attempted maxAttempts 
times
+    verify(initialWriter, atLeast(2)).sync();
     assertEquals(STORE_AND_FORWARD, logGroup.getMode());
   }
 
@@ -714,11 +693,15 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     LogFileWriter initialWriter = activeLog.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
-    // Configure initial writer to fail on sync
+    // Configure initial writer to always fail on sync
     doThrow(new IOException("Simulated sync 
failure")).when(initialWriter).sync();
 
-    // createNewWriter should keep returning the bad writer
-    doAnswer(invocation -> initialWriter).when(activeLog).createNewWriter();
+    // Make any new writers also fail on sync so a rotation can't rescue the 
retry loop
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      doThrow(new IOException("Simulated sync failure")).when(w).sync();
+      return w;
+    }).when(activeLog).createNewWriter();
 
     doThrow(new IOException("Simulated failure to update HAGroupStore state"))
       
.when(haGroupStoreManager).setHAGroupStatusToStoreAndForward(haGroupName);
@@ -732,13 +715,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     } catch (RuntimeException ex) {
       assertTrue(ex.getMessage().contains("Simulated sync failure"));
     }
-    // wait for the even processor thread to clean up
+    // wait for the event processor thread to clean up
     Thread.sleep(3);
   }
 
   /**
-   * Tests log rotation behavior during batch operations. Verifies that the 
system correctly handles
-   * rotation when there are pending batch operations, ensuring no data loss.
+   * Tests rotation during an in-flight batch. When a pending writer is staged 
by the rotation task
+   * while appends are in flight, the sync's apply() drains the pending 
writer, replays the 5
+   * unsynced records into the new writer, then syncs the new writer.
    */
   @Test
   public void testRotationDuringBatch() throws Exception {
@@ -750,42 +734,38 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRotation);
 
-    // Append several items to fill currentBatch but don't sync yet
+    // Append several items but don't sync yet
     for (int i = 0; i < 5; i++) {
       logGroup.append(tableName, commitId + i, put);
     }
 
-    // Force a rotation by waiting for rotation time to elapse
-    sleep((long) (TEST_ROTATION_TIME * 1.25));
+    // Stage a pending writer via forced rotation
+    logGroup.getActiveLog().forceRotation();
 
-    // Get the new writer after rotation
+    // Sync — apply() drains pendingWriter, replays 5 records into new writer, 
then syncs
+    logGroup.sync();
+
+    // The swap happened before sync action
     LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
     assertNotNull("New writer should not be null", writerAfterRotation);
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
-    // Now trigger a sync which should replay the currentBatch to the new 
writer
-    logGroup.sync();
-
-    // Verify the sequence of operations
     InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
-
-    // Verify all appends before rotation went to the first writer
+    // 5 appends went to old writer (processed before rotation task fired)
     for (int i = 0; i < 5; i++) {
       inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
         eq(put));
     }
-
-    // Verify the currentBatch was replayed to the new writer
+    // Swap happens before sync action: 5 records replayed into new writer
     for (int i = 0; i < 5; i++) {
       inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
         eq(put));
     }
-
-    // Verify sync happened on the new writer
+    // Sync goes to new writer
     inOrder.verify(writerAfterRotation, times(1)).sync();
 
-    // Verify the initial writer was closed
-    verify(writerBeforeRotation, times(1)).close();
+    // Verify the initial writer was closed asynchronously
+    verify(writerBeforeRotation, timeout(5000).times(1)).close();
   }
 
   /**
@@ -811,7 +791,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.sync(); // Sync to commit the appends to the current writer.
 
     // Force a rotation to close the current writer.
-    activeLog.rotateLog(RotationReason.SIZE);
+    activeLog.forceRotation();
 
     assertTrue("Log file should exist", localFs.exists(logPath));
 
@@ -869,7 +849,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       }
       logGroup.sync(); // Sync to commit the appends to the current writer.
       // Force a rotation to close the current writer.
-      activeLog.rotateLog(RotationReason.SIZE);
+      activeLog.forceRotation();
     }
 
     // Verify all log files exist
@@ -905,14 +885,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
   }
 
   /**
-   * Tests reading records after multiple rotations with intermittent syncs. 
If we do not sync when
-   * we roll a file, the in-flight batch is replayed into the new writer when 
we do finally sync
-   * (with the new writer). Verifies that records can be correctly read even 
when syncs are not
-   * performed before each rotation, ensuring data consistency.
+   * Tests reading records after multiple rotations with varying batch sizes. 
The writer swap
+   * happens inside {@code apply} before each attempt, so unsynced appends are 
replayed into the new
+   * writer. This test verifies data consistency across many rotations with 
different record counts
+   * per file.
    */
   @Test
-  public void testReadAfterMultipleRotationsWithReplay() throws Exception {
-    final String tableName = "TBLRAMRIS";
+  public void testReadAfterMultipleRotationsWithVaryingBatchSizes() throws 
Exception {
+    final String tableName = "TBLRAMRVBS";
     final int NUM_RECORDS_PER_ROTATION = 100;
     final int NUM_ROTATIONS = 10;
     final int TOTAL_RECORDS = NUM_RECORDS_PER_ROTATION * NUM_ROTATIONS;
@@ -921,9 +901,7 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     ReplicationLog activeLog = logGroup.getActiveLog();
 
-    // Write records across multiple rotations, only syncing 50% of the time.
     for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
-      // Get the path of the current log file.
       Path logPath = activeLog.getWriter().getContext().getFilePath();
       logPaths.add(logPath);
 
@@ -935,13 +913,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
         logGroup.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
       }
 
-      // Only sync 50% of the time before rotation. To ensure we sync on the 
last file
-      // we are going to write, use 'rotation % 2 == 1' instead of 'rotation % 
2 == 0'.
-      if (rotation % 2 == 1) {
-        logGroup.sync(); // Sync to commit the appends to the current writer.
-      }
-      // Force a rotation to close the current writer.
-      activeLog.rotateLog(RotationReason.SIZE);
+      logGroup.sync();
+      activeLog.forceRotation();
     }
 
     // Verify all log files exist
@@ -949,10 +922,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       assertTrue("Log file should exist: " + logPath, localFs.exists(logPath));
     }
 
-    // Read and verify all records from each log file, tracking unique records 
and duplicates.
-    Set<LogFile.Record> uniqueRecords = new HashSet<>();
+    // Read and verify all records from each log file
     List<LogFile.Record> allReadRecords = new ArrayList<>();
-
     for (Path logPath : logPaths) {
       LogFileReader reader = new LogFileReader();
       LogFileReaderContext readerContext =
@@ -961,24 +932,22 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       LogFile.Record record;
       while ((record = reader.next()) != null) {
         allReadRecords.add(record);
-        uniqueRecords.add(record);
       }
       reader.close();
     }
 
-    // Print statistics about duplicates for informational purposes.
-    LOG.info("{} total records across all files", allReadRecords.size());
-    LOG.info("{} unique records", uniqueRecords.size());
-    LOG.info("{} duplicate records", allReadRecords.size() - 
uniqueRecords.size());
+    assertEquals("Total number of records mismatch", TOTAL_RECORDS, 
allReadRecords.size());
 
-    // Verify we have all the expected unique records
-    assertEquals("Number of unique records mismatch", TOTAL_RECORDS, 
uniqueRecords.size());
+    for (int i = 0; i < TOTAL_RECORDS; i++) {
+      LogFileTestUtil.assertRecordEquals("Record mismatch at index " + i, 
originalRecords.get(i),
+        allReadRecords.get(i));
+    }
   }
 
   /**
-   * Tests behavior when a RuntimeException occurs during writer.getLength() 
in shouldRotate().
-   * Verifies that the system properly handles critical errors by closing the 
log and preventing
-   * further operations.
+   * Tests behavior when a RuntimeException occurs during writer.getLength() in
+   * shouldRotateForSize(). Verifies that the system properly handles critical 
errors by closing the
+   * log and preventing further operations.
    */
   @Test
   public void testRuntimeExceptionDuringLengthCheck() throws Exception {
@@ -1095,6 +1064,64 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     verify(innerWriter, times(1)).close();
   }
 
+  /**
+   * Tests that an undrained pendingWriter is closed and replaced by the 
rotation task. This
+   * simulates an idle system where no events flow to drain the staged writer 
before the next
+   * rotation tick fires. Verifies the undrained writer is actually closed by 
the second tick.
+   */
+  @Test
+  public void testUndrainedPendingWriterReplaced() throws Exception {
+    final String tableName = "TBLUPWR";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+    final int roundDurationSeconds = 5;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Capture writers created by rotation ticks
+    List<LogFileWriter> rotationWriters = new ArrayList<>();
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      rotationWriters.add(w);
+      return w;
+    }).when(activeLog).createNewWriter();
+
+    // Append and sync to establish baseline
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
+
+    // Wait for rotation tick — this stages a pending writer (W2)
+    waitForRotationTick(roundDurationSeconds);
+
+    // The pending writer is staged but not drained (no events to trigger 
checkAndReplaceWriter).
+    // Wait for the next rotation tick — it should close the undrained writer 
and stage a new one.
+    waitForRotationTick(roundDurationSeconds);
+
+    // Now drain by appending (apply() calls checkAndReplaceWriter)
+    logGroup.append(tableName, commitId + 1, put);
+    logGroup.sync();
+
+    LogFileWriter writerAfterRotation = activeLog.getWriter();
+    assertTrue("Writer should have been rotated", writerAfterRotation != 
initialWriter);
+
+    // W2 was created by first tick, W3 by second tick
+    assertTrue("Expected at least 2 rotation writers", rotationWriters.size() 
>= 2);
+    LogFileWriter undrainedWriter = rotationWriters.get(0);
+    LogFileWriter finalWriter = rotationWriters.get(rotationWriters.size() - 
1);
+
+    // Undrained W2 was closed synchronously by the second rotation tick
+    verify(undrainedWriter, times(1)).close();
+
+    // Final writer is the one we're using
+    assertEquals("Active writer should be the last rotation writer", 
finalWriter,
+      writerAfterRotation);
+  }
+
   /**
    * Tests that multiple sync requests are consolidated into a single sync 
operation on the inner
    * writer when they occur in quick succession. Verifies that the Disruptor 
batching and
@@ -1293,6 +1320,308 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     group2.close();
   }
 
+  /**
+   * Tests that unsynced appends are replayed into the new writer when a swap 
happens mid-batch.
+   * Append 3 records (no sync), stage a pending writer, append a 4th record 
which triggers the swap
+   * + replay, then sync. Old writer gets 3 appends, new writer gets 3 
replayed + 4th append + sync.
+   */
+  @Test
+  public void testReplayOnMidBatchSwap() throws Exception {
+    final String tableName = "TBLRMBS";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    long commitId = 1L;
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter writerBeforeRotation = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+    // Append 3 records without syncing — they accumulate in currentBatch
+    for (int i = 0; i < 3; i++) {
+      logGroup.append(tableName, commitId + i, put);
+    }
+
+    // Stage a pending writer via forced rotation
+    activeLog.forceRotation();
+
+    // 4th append triggers checkAndReplaceWriter + generation mismatch → 
replays 3 records
+    logGroup.append(tableName, commitId + 3, put);
+    logGroup.sync();
+
+    LogFileWriter writerAfterRotation = activeLog.getWriter();
+    assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
+
+    InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
+    // 3 appends went to old writer
+    for (int i = 0; i < 3; i++) {
+      inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
+        eq(put));
+    }
+    // 3 records replayed into new writer
+    for (int i = 0; i < 3; i++) {
+      inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
+        eq(put));
+    }
+    // 4th append goes to new writer
+    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 3), eq(put));
+    inOrder.verify(writerAfterRotation, times(1)).sync();
+
+    // Old writer closed async
+    verify(writerBeforeRotation, timeout(5000).times(1)).close();
+  }
+
+  /**
+   * Tests the lease-recovery scenario: when the current writer's stream is 
broken (simulated by
+   * failing sync), a staged pending writer is picked up before the action, 
replays unsynced
+   * appends, and succeeds without ever calling sync on the broken writer.
+   */
+  @Test
+  public void testRetryPicksUpStagedWriter() throws Exception {
+    final String tableName = "TBLRPSW";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Append a record — goes to initialWriter
+    logGroup.append(tableName, commitId, put);
+
+    // Configure initial writer's sync to fail (simulating broken stream after 
lease recovery)
+    doThrow(new IOException("Simulated broken 
stream")).when(initialWriter).sync();
+
+    // Stage a pending writer via forced rotation
+    activeLog.forceRotation();
+
+    // Sync: checkAndReplaceWriter drains pending writer before the sync 
action,
+    // replays 1 record into new writer, then syncs new writer. Old writer's 
sync is
+    // never called because the swap happens before the action.
+    logGroup.sync();
+
+    LogFileWriter newWriter = activeLog.getWriter();
+    assertTrue("Should be using new writer", newWriter != initialWriter);
+
+    // Old writer: received the append only
+    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+
+    // New writer: received replayed append + successful sync
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+    verify(newWriter, times(1)).sync();
+  }
+
+  /**
+   * Tests the idle-then-lease-recovery scenario: after a sync clears 
currentBatch, the system goes
+   * idle. A rotation tick stages a pending writer. The reader performs HDFS 
lease recovery,
+   * breaking the old writer's stream. When events resume, apply() drains the 
healthy staged writer
+   * before the action — the broken writer is never touched. No replay needed 
(empty batch).
+   */
+  @Test
+  public void testIdleLeaseRecoveryDrainsStagedWriter() throws Exception {
+    final String tableName = "TBLILRDSW";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Append + sync to establish baseline and clear currentBatch
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
+
+    // Stage W2 in pendingWriter via forced rotation
+    activeLog.forceRotation();
+
+    // Simulate HDFS lease recovery breaking the old writer's stream
+    doThrow(new IOException("Simulated broken stream after lease 
recovery")).when(initialWriter)
+      .append(anyString(), anyLong(), any(Mutation.class));
+    doThrow(new IOException("Simulated broken stream after lease 
recovery")).when(initialWriter)
+      .sync();
+
+    // Events resume — apply() drains W2 before the action, so broken writer 
is never touched
+    logGroup.append(tableName, commitId + 1, put);
+    logGroup.sync();
+
+    LogFileWriter newWriter = activeLog.getWriter();
+    assertTrue("Should be using new writer after idle + lease recovery",
+      newWriter != initialWriter);
+
+    // New writer received the new append + sync (no replay — currentBatch was 
empty)
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1), 
eq(put));
+    verify(newWriter, times(1)).sync();
+
+    // Old writer: only the pre-idle append + sync, nothing after the break
+    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
+    verify(initialWriter, times(1)).sync();
+  }
+
+  /**
+   * Tests that a failed replay is retried on the next attempt. The new 
writer's first append fails
+   * during replay, so the generation stays stale. On retry, the generation 
mismatch is detected
+   * again and replay succeeds.
+   */
+  @Test
+  public void testReplayFailureRetries() throws Exception {
+    final String tableName = "TBLRFR";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    long commitId = 1L;
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Intercept createNewWriter to capture the new writer and make its first 
append fail
+    final AtomicBoolean failFirstAppend = new AtomicBoolean(true);
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      doAnswer(appendInvocation -> {
+        if (failFirstAppend.getAndSet(false)) {
+          throw new IOException("Simulated transient HDFS error during 
replay");
+        }
+        return appendInvocation.callRealMethod();
+      }).when(w).append(anyString(), anyLong(), any(Mutation.class));
+      return w;
+    }).when(activeLog).createNewWriter();
+
+    // Append 2 records without syncing — they accumulate in currentBatch
+    logGroup.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId + 1, put);
+
+    // Stage a pending writer via forced rotation (whose first append will 
fail)
+    activeLog.forceRotation();
+
+    // 3rd append triggers swap + replay of [r1, r2].
+    // Attempt 1: replay fails on first record → IOException → generation 
stays stale
+    // Attempt 2: generation mismatch still → replay retries → succeeds → r3 
appended
+    logGroup.append(tableName, commitId + 2, put);
+    logGroup.sync();
+
+    LogFileWriter newWriter = activeLog.getWriter();
+    assertTrue("Should be using new writer", newWriter != initialWriter);
+
+    // New writer: attempt 1 replay failed on r1 (1 call). Attempt 2 replayed 
r1+r2 (2 calls)
+    // then appended r3. Total: r1 called 2x, r2 called 1x, r3 called 1x.
+    verify(newWriter, times(2)).append(eq(tableName), eq(commitId), eq(put));
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1), 
eq(put));
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 2), 
eq(put));
+    verify(newWriter, times(1)).sync();
+  }
+
+  /**
+   * Tests error-recovery rotation: when the current writer's stream is 
broken, the second failure
+   * in apply() triggers requestRotation() which submits an on-demand 
LogRotationTask. During the
+   * retry sleep, the background thread creates a new writer. The next attempt 
drains it and
+   * succeeds.
+   */
+  @Test
+  public void testErrorRecoveryRequestsNewWriter() throws Exception {
+    final String tableName = "TBLERNW";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Configure initial writer's append to always fail (simulating broken 
HDFS stream)
+    doThrow(new IOException("Simulated broken 
stream")).when(initialWriter).append(anyString(),
+      anyLong(), any(Mutation.class));
+
+    // Append — attempt 1 fails, attempt 2 fails + requestRotation(), during 
sleep the
+    // background thread creates a new writer, attempt 3 drains it and succeeds
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
+
+    LogFileWriter newWriter = activeLog.getWriter();
+    assertTrue("Should be using a new writer after error recovery", newWriter 
!= initialWriter);
+
+    // Old writer received failed attempts
+    verify(initialWriter, atLeast(2)).append(eq(tableName), eq(commitId), 
eq(put));
+    // New writer received the successful append
+    verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+    verify(newWriter, times(1)).sync();
+  }
+
+  /**
+   * Tests that an on-demand size rotation mid-interval does not suppress the 
next scheduled tick.
+   * After size rotation creates a writer early, the scheduled tick still 
fires and creates another.
+   */
+  @Test
+  public void testOnDemandRotationDoesNotSuppressScheduledTick() throws 
Exception {
+    final String tableName = "TBLODRNST";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
+    long commitId = 1L;
+    final int roundDurationSeconds = 5;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Append enough data to trigger size rotation → requestRotation() fires
+    for (int i = 0; i < 100; i++) {
+      logGroup.append(tableName, commitId++, put);
+    }
+    logGroup.sync();
+
+    // Wait for the on-demand rotation task to create a new writer on the 
background thread
+    Thread.sleep(100);
+
+    // Drain W2 via next append
+    logGroup.append(tableName, commitId++, put);
+    logGroup.sync();
+
+    LogFileWriter writerAfterSizeRotation = activeLog.getWriter();
+    assertTrue("Writer should have been rotated for size",
+      writerAfterSizeRotation != initialWriter);
+
+    // Wait for the scheduled rotation tick
+    waitForRotationTick(roundDurationSeconds);
+
+    // Drain W3
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
+
+    LogFileWriter writerAfterScheduledTick = activeLog.getWriter();
+    assertTrue("Scheduled tick should have created a new writer (not 
suppressed)",
+      writerAfterScheduledTick != writerAfterSizeRotation);
+  }
+
+  /**
+   * Tests that the rotation executor fires at the round boundary by verifying 
that after waiting
+   * for slightly more than a round, a rotation has occurred.
+   */
+  @Test
+  public void testRotationScheduleAlignsWithRoundBoundary() throws Exception {
+    final String tableName = "TBLRSARB";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+    final int roundDurationSeconds = 5;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
+
+    LogFileWriter writerBeforeRotation = logGroup.getActiveLog().getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+    // Append and sync so we have data
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
+
+    // Wait for slightly more than one round duration — the rotation task 
should have fired
+    waitForRotationTick(roundDurationSeconds);
+
+    // Trigger drain
+    logGroup.append(tableName, commitId + 1, put);
+    logGroup.sync();
+
+    LogFileWriter writerAfterRotation = logGroup.getActiveLog().getWriter();
+    assertTrue("Rotation should have happened at round boundary",
+      writerAfterRotation != writerBeforeRotation);
+  }
+
   // @Test
   public void testAppendTimeoutWhileSyncPending() throws Exception {
     final String tableName = "TESTTBL";
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
new file mode 100644
index 0000000000..89bf708f05
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class ReplicationLogTest {
+
+  @Test
+  public void testComputeInitialDelay() {
+    long rotationTimeMs = 60_000L;
+
+    // Exactly on a round boundary: full round until next tick
+    long now = 120_000L;
+    long roundStart = 120_000L;
+    assertEquals(rotationTimeMs,
+      ReplicationLog.computeInitialDelay(now, roundStart, rotationTimeMs));
+
+    // Middle of the round: half a round remaining
+    now = 150_000L;
+    roundStart = 120_000L;
+    assertEquals(30_000L, ReplicationLog.computeInitialDelay(now, roundStart, 
rotationTimeMs));
+
+    // Near the end of a round: small but positive delay
+    now = 179_999L;
+    roundStart = 120_000L;
+    assertEquals(1L, ReplicationLog.computeInitialDelay(now, roundStart, 
rotationTimeMs));
+
+    // Just after a round boundary
+    now = 120_001L;
+    roundStart = 120_000L;
+    assertEquals(59_999L, ReplicationLog.computeInitialDelay(now, roundStart, 
rotationTimeMs));
+  }
+
+  @Test
+  public void testComputeInitialDelayIsAlwaysPositive() {
+    long rotationTimeMs = 60_000L;
+    long dayStart = 1704067200000L;
+
+    for (int offsetMs = 0; offsetMs < 60_000; offsetMs += 1000) {
+      long now = dayStart + offsetMs;
+      long roundStart = (now / rotationTimeMs) * rotationTimeMs;
+      long delay = ReplicationLog.computeInitialDelay(now, roundStart, 
rotationTimeMs);
+      assertTrue("Delay must be > 0 for offset " + offsetMs, delay > 0);
+      assertTrue("Delay must be <= rotationTimeMs for offset " + offsetMs, 
delay <= rotationTimeMs);
+    }
+  }
+}


Reply via email to