This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 6931e7167e PHOENIX-7793 Addendum Replication Log writer improvements 
(#2459)
6931e7167e is described below

commit 6931e7167ee9e5fee3c18de9eb1e63eded79278b
Author: tkhurana <[email protected]>
AuthorDate: Tue May 5 14:16:44 2026 -0700

    PHOENIX-7793 Addendum Replication Log writer improvements (#2459)
---
 .../PhoenixWALSyncTimeoutException.java            |  36 +++
 .../apache/phoenix/replication/ReplicationLog.java |  78 +++---
 .../phoenix/replication/ReplicationLogGroup.java   |  37 ++-
 .../phoenix/replication/ReplicationModeImpl.java   |  12 +-
 .../replication/StoreAndForwardModeImpl.java       |   6 +-
 .../replication/SyncAndForwardModeImpl.java        |   7 +-
 .../apache/phoenix/replication/SyncModeImpl.java   |   7 +-
 .../replication/log/LogFileFormatWriter.java       |   3 +
 .../phoenix/replication/log/LogFileWriter.java     |  29 +--
 .../metrics/MetricsReplicationLogGroupSource.java  |   6 +
 .../MetricsReplicationLogGroupSourceImpl.java      |  11 +-
 .../metrics/ReplicationLogMetricValues.java        |  11 +-
 .../replication/ReplicationLogBaseTest.java        |  13 +-
 .../replication/ReplicationLogGroupTest.java       | 261 ++++++++++++++++-----
 .../replication/log/LogFileWriterSyncTest.java     |   7 +-
 15 files changed, 359 insertions(+), 165 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/PhoenixWALSyncTimeoutException.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/PhoenixWALSyncTimeoutException.java
new file mode 100644
index 0000000000..520e8ff23e
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/PhoenixWALSyncTimeoutException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+
+/**
+ * Thrown when a Phoenix replication log sync operation times out.
+ */
+public class PhoenixWALSyncTimeoutException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  public PhoenixWALSyncTimeoutException(String message) {
+    super(message);
+  }
+
+  public PhoenixWALSyncTimeoutException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index b699981c24..d6b7b01535 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -77,6 +78,9 @@ public class ReplicationLog {
   protected final AtomicLong rotationFailures = new AtomicLong(0);
   // Staged writer created by the background LogRotationTask, drained by 
checkAndReplaceWriter().
   private final AtomicReference<LogFileWriter> pendingWriter = new 
AtomicReference<>();
+  // Latch set by apply() before requesting an on-demand rotation; counted 
down by LogRotationTask
+  // in a finally block so apply() can wait (with timeout) for a fresh writer 
to be staged.
+  private volatile CountDownLatch rotationStagedLatch;
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private final AtomicBoolean rotationRequested = new AtomicBoolean(false);
   private final ExecutorService closeExecutor;
@@ -98,12 +102,19 @@ public class ReplicationLog {
     this.retryDelayMs = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_RETRY_DELAY_MS_KEY,
       ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
     this.rotationTimeMs = shardManager.getReplicationRoundDurationSeconds() * 
1000L;
-    long rotationSize = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
-      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+    long configuredRotationSize =
+      conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+        ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
     double rotationSizePercent =
       
conf.getDouble(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
         ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
-    this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
+    long blockSize = shardManager.getFileSystem().getDefaultBlockSize();
+    if (configuredRotationSize > blockSize) {
+      LOG.warn("Configured rotation size {} exceeds HDFS block size {}; 
clamping to block size",
+        configuredRotationSize, blockSize);
+    }
+    long effectiveRotationSize = Math.min(configuredRotationSize, blockSize);
+    this.rotationSizeBytes = (long) (effectiveRotationSize * 
rotationSizePercent);
     this.maxRotationRetries = 
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_ROTATION_RETRIES_KEY,
       ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
     String compressionName = 
conf.get(ReplicationLogGroup.REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
@@ -142,6 +153,7 @@ public class ReplicationLog {
     LogFileWriter newWriter = new LogFileWriter();
     newWriter.init(writerContext);
     newWriter.setGeneration(writerGeneration.incrementAndGet());
+    LOG.info("Created new writer: {}", newWriter);
     return newWriter;
   }
 
@@ -309,16 +321,15 @@ public class ReplicationLog {
       } catch (IOException e) {
         LOG.debug("Attempt {}/{} failed", attempt, maxAttempts, e);
         if (attempt == maxAttempts) {
-          closeOnError();
           throw e;
         }
-        // First failure retries on the same writer (transient). Second failure
-        // requests a new writer to recover from non-transient stream errors.
-        if (attempt > 1) {
-          requestRotation();
-        }
+        // Each retry runs on a fresh writer. Stage a latch, request rotation, 
and wait briefly
+        // for the LogRotationTask to count the latch down after staging a new 
pendingWriter.
+        CountDownLatch latch = new CountDownLatch(1);
+        rotationStagedLatch = latch;
+        requestRotation();
         try {
-          Thread.sleep(retryDelayMs);
+          latch.await(retryDelayMs, TimeUnit.MILLISECONDS);
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
           throw new InterruptedIOException("Interrupted during retry delay");
@@ -351,44 +362,31 @@ public class ReplicationLog {
   }
 
   /**
-   * Force closes the log upon an unrecoverable internal error. This is a 
fail-stop behavior: once
-   * called, the log is marked as closed, the Disruptor is halted, and all 
subsequent append() and
-   * sync() calls will throw an IOException("Closed"). This ensures that no 
further operations are
-   * attempted on a log that has encountered a critical error.
+   * Closes the log with a bounded duration. Subsequent append() and sync() 
calls will throw
+   * IOException("Closed"). Safe to call multiple times — only the first 
invocation performs
+   * cleanup.
+   * @param graceful true for graceful shutdown, false for error/forced 
shutdown
    */
-  protected void closeOnError() {
+  public void close(boolean graceful) {
     if (!closed.compareAndSet(false, true)) {
       return;
     }
     stopRotationExecutor();
-    closeExecutor.shutdownNow();
     LogFileWriter staged = pendingWriter.getAndSet(null);
     if (staged != null) {
-      closeWriter(staged);
-    }
-    closeWriter(currentWriter);
-  }
-
-  /** Closes the log. */
-  public void close() {
-    if (!closed.compareAndSet(false, true)) {
-      return;
+      submitClose(staged);
     }
-    stopRotationExecutor();
+    submitClose(currentWriter);
     closeExecutor.shutdown();
     try {
-      if (!closeExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+      if (!closeExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOG.warn("Close executor did not terminate in 10s, abandoning writer 
closes");
         closeExecutor.shutdownNow();
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       closeExecutor.shutdownNow();
     }
-    LogFileWriter staged = pendingWriter.getAndSet(null);
-    if (staged != null) {
-      closeWriter(staged);
-    }
-    closeWriter(currentWriter);
   }
 
   @VisibleForTesting
@@ -422,16 +420,22 @@ public class ReplicationLog {
         }
         rotationFailures.set(0);
         logGroup.getMetrics().incrementRotationCount();
-      } catch (IOException e) {
+      } catch (Throwable t) {
         logGroup.getMetrics().incrementRotationFailureCount();
         long numFailures = rotationFailures.incrementAndGet();
         if (numFailures >= maxRotationRetries) {
           LOG.error("Too many rotation failures ({}/{}), closing log", 
numFailures,
-            maxRotationRetries, e);
-          closeOnError();
+            maxRotationRetries, t);
+          close(false);
         } else {
-          LOG.info("Failed to create new writer for rotation (attempt {}/{}), 
retrying...",
-            numFailures, maxRotationRetries, e);
+          LOG.error("Failed to create new writer for rotation (attempt {}/{}), 
retrying...",
+            numFailures, maxRotationRetries, t);
+        }
+      } finally {
+        CountDownLatch latch = rotationStagedLatch;
+        if (latch != null) {
+          latch.countDown();
+          rotationStagedLatch = null;
         }
       }
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index c69292a9a1..d4ac31b493 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
@@ -159,14 +158,15 @@ public class ReplicationLogGroup {
     "phoenix.replication.log.sync.timeout.ms";
   public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
     "phoenix.replication.log.sync.retries";
-  public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 4;
+  public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 1;
   public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
     "phoenix.replication.log.rotation.retries";
   public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
   public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
     "phoenix.replication.log.retry.delay.ms";
   public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
-  private static final long DEFAULT_HDFS_WRITE_RPC_TIMEOUT_MS = 30 * 1000;
+  public static final String WAL_SYNC_TIMEOUT_MS_KEY = 
"hbase.regionserver.wal.sync.timeout";
+  public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;
 
   public static final String STANDBY_DIR = "in";
   public static final String FALLBACK_DIR = "out";
@@ -438,22 +438,16 @@ public class ReplicationLogGroup {
   }
 
   /**
-   * Calculate how long the application thread should wait for a sync to 
finish. The application
-   * thread here is the write rpc handler thread. It takes into account the 
number of retries, pause
-   * between successive attempts, dfs write timeout and zk session timeouts.
+   * Calculate how long the application thread should wait for a sync to 
finish. Uses the SAF sync
+   * timeout as the base — the consumer thread must have time to attempt SYNC 
retries, flip to SAF,
+   * attempt SAF retries, and either succeed or abort before the RPC handler 
gives up.
    * @return sync timeout in ms
    */
   protected long calculateSyncTimeout() {
-    int maxAttempts =
-      conf.getInt(REPLICATION_LOG_SYNC_RETRIES_KEY, 
DEFAULT_REPLICATION_LOG_SYNC_RETRIES) + 1;
-    long retryDelayMs =
-      conf.getLong(REPLICATION_LOG_RETRY_DELAY_MS_KEY, 
DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
-    long wrtiteRpcTimeout = 
conf.getLong(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
-      DEFAULT_HDFS_WRITE_RPC_TIMEOUT_MS);
+    long walSyncTimeout = conf.getLong(WAL_SYNC_TIMEOUT_MS_KEY, 
DEFAULT_WAL_SYNC_TIMEOUT_MS);
     // account for HAGroupStore update when we switch replication mode
     long zkTimeoutMs = conf.getLong(ZK_SESSION_TIMEOUT, 
DEFAULT_ZK_SESSION_TIMEOUT);
-    long totalRpcTimeout = maxAttempts * wrtiteRpcTimeout + (maxAttempts - 1) 
* retryDelayMs;
-    return 2 * totalRpcTimeout + zkTimeoutMs;
+    return walSyncTimeout + zkTimeoutMs;
   }
 
   /**
@@ -583,10 +577,11 @@ public class ReplicationLogGroup {
       LOG.error(message, e);
       abort(message, e);
     } catch (TimeoutException e) {
-      String message = String.format("HAGroup %s sync operation timed out", 
this);
-      LOG.error(message);
-      // sync timeout is a fatal error
-      abort(message, e);
+      String message =
+        String.format("HAGroup %s replication log sync timed out after %d ms", 
this, syncTimeoutMs);
+      LOG.error(message, e);
+      PhoenixWALSyncTimeoutException timeoutEx = new 
PhoenixWALSyncTimeoutException(message, e);
+      abort(message, timeoutEx);
     }
   }
 
@@ -914,7 +909,7 @@ public class ReplicationLogGroup {
         future.complete(null);
       }
       pendingSyncFutures.clear();
-      LOG.info("Sync operation completed successfully up to sequence {}", 
sequence);
+      LOG.debug("Sync operation completed successfully up to sequence {}", 
sequence);
       // after a successful sync check the mode set on the replication group
       // Doing the mode check on sync points makes the implementation more 
robust
       // since we can guarantee that all unsynced appends have been flushed to 
the
@@ -1053,6 +1048,10 @@ public class ReplicationLogGroup {
           // and got an exception. This is a fatal exception so halt the 
disruptor
           // fail the pending sync events with the original exception
           failPendingSyncs(sequence, e);
+          // Both SYNC and SAF writes failed. The local HBase WAL has the 
mutation but neither
+          // replication pipeline does, so the SAF forwarder cannot reconcile 
it. Abort the RS
+          // so region reassignment + preWALRestore re-ships the orphaned 
edits.
+          abort("Both SYNC and SAF replication writes failed", fatalEx);
           // halt the disruptor with the fatal exception
           throw fatalEx;
         }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
index 8e569b5f31..1238513135 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
@@ -86,17 +86,9 @@ public abstract class ReplicationModeImpl {
     getReplicationLog().sync();
   }
 
-  /** Graceful close */
-  void closeReplicationLog() {
+  void closeReplicationLog(boolean graceful) {
     if (log != null) {
-      log.close();
-    }
-  }
-
-  /** Forced close */
-  void closeReplicationLogOnError() {
-    if (log != null) {
-      log.closeOnError();
+      log.close(graceful);
     }
   }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
index ab5c26b2da..ea18a5b853 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
@@ -105,11 +105,7 @@ public class StoreAndForwardModeImpl extends 
ReplicationModeImpl {
   void onExit(boolean gracefulShutdown) {
     LOG.info("HAGroup {} exiting mode {} graceful={}", logGroup, this, 
gracefulShutdown);
     stopHAGroupStoreUpdateTask();
-    if (gracefulShutdown) {
-      closeReplicationLog();
-    } else {
-      closeReplicationLogOnError();
-    }
+    closeReplicationLog(gracefulShutdown);
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
index 4b409633d9..aefce975cf 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
@@ -55,16 +55,13 @@ public class SyncAndForwardModeImpl extends 
ReplicationModeImpl {
     LOG.info("HAGroup {} exiting mode {} graceful={}", logGroup, this, 
gracefulShutdown);
     // stop the replication log forwarding
     logGroup.getLogForwarder().stop();
-    if (gracefulShutdown) {
-      closeReplicationLog();
-    } else {
-      closeReplicationLogOnError();
-    }
+    closeReplicationLog(gracefulShutdown);
   }
 
   @Override
   ReplicationMode onFailure(Throwable e) throws IOException {
     LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
+    logGroup.getMetrics().incrementSyncToSafTransitions();
     try {
       logGroup.setHAGroupStatusToStoreAndForward();
     } catch (Exception ex) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
index b3370a8a50..75e175ad4e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
@@ -50,16 +50,13 @@ public class SyncModeImpl extends ReplicationModeImpl {
   @Override
   void onExit(boolean gracefulShutdown) {
     LOG.info("HAGroup {} exiting mode {} graceful={}", logGroup, this, 
gracefulShutdown);
-    if (gracefulShutdown) {
-      closeReplicationLog();
-    } else {
-      closeReplicationLogOnError();
-    }
+    closeReplicationLog(gracefulShutdown);
   }
 
   @Override
   ReplicationMode onFailure(Throwable e) throws IOException {
     LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
+    logGroup.getMetrics().incrementSyncToSafTransitions();
     try {
       // first update the HAGroupStore state
       logGroup.setHAGroupStatusToStoreAndForward();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 2555f17225..11b427112a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -59,6 +59,9 @@ public class LogFileFormatWriter implements Closeable {
     this.encoder = context.getCodec().getEncoder(blockDataStream);
     // Write header immediately when file is created
     writeFileHeader();
+    // Sync the header to force the first HDFS block allocation on the 
caller's thread (the
+    // rotation thread). Without this, addBlock fires on the consumer thread's 
first sync().
+    output.sync();
   }
 
   private void writeFileHeader() throws IOException {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index e1c3aadf49..2a0f4c5dd1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication.log;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -35,7 +36,7 @@ public class LogFileWriter implements LogFile.Writer {
 
   private LogFileWriterContext context;
   private LogFileFormatWriter writer;
-  private volatile boolean closed = false;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
   /**
    * A monotonically increasing sequence number that identifies this writer 
instance, used to detect
    * log file rotations and ensure proper handling of in-flight operations. 
Higher layers will get a
@@ -72,9 +73,13 @@ public class LogFileWriter implements LogFile.Writer {
     LOG.debug("Initialized LogFileWriter for path {}", context.getFilePath());
   }
 
+  public boolean isClosed() {
+    return closed.get();
+  }
+
   @Override
   public void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
-    if (closed) {
+    if (isClosed()) {
       throw new IOException("Writer has been closed");
     }
     writer.append(
@@ -83,7 +88,7 @@ public class LogFileWriter implements LogFile.Writer {
 
   @Override
   public void sync() throws IOException {
-    if (closed) {
+    if (isClosed()) {
       throw new IOException("Writer has been closed");
     }
     writer.sync();
@@ -91,7 +96,7 @@ public class LogFileWriter implements LogFile.Writer {
 
   @Override
   public long getLength() throws IOException {
-    if (closed) {
+    if (isClosed()) {
       // Attempt to get length from filesystem if stream is closed
       if (context.getFileSystem().exists(context.getFilePath())) {
         return 
context.getFileSystem().getFileStatus(context.getFilePath()).getLen();
@@ -108,23 +113,19 @@ public class LogFileWriter implements LogFile.Writer {
 
   @Override
   public void close() throws IOException {
-    if (closed) {
+    if (!closed.compareAndSet(false, true)) {
       return;
     }
-    try {
-      // Close the final block and write the trailer
-      if (writer != null) {
-        writer.close();
-      }
-    } finally {
-      closed = true;
-      LOG.debug("Closed LogFileWriter for path {}", context.getFilePath());
+    // Close the final block and write the trailer
+    if (writer != null) {
+      writer.close();
     }
+    LOG.debug("Closed LogFileWriter {}", this);
   }
 
   @Override
   public String toString() {
-    return "LogFileWriter [formatWriter=" + writer + ", closed=" + closed + ", 
generation="
+    return "LogFileWriter [formatWriter=" + writer + ", closed=" + isClosed() 
+ ", generation="
       + generation + "]";
   }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index b3cc095048..b81df30b5f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -45,6 +45,9 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
   String RING_BUFFER_TIME = "ringBufferTime";
   String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
 
+  String SYNC_TO_SAF_TRANSITIONS = "syncToSafTransitions";
+  String SYNC_TO_SAF_TRANSITIONS_DESC = "Number of SYNC to STORE_AND_FORWARD 
mode transitions";
+
   /**
    * Increments the counter for total log rotations. This counter tracks the 
total number of times
    * the log was rotated, regardless of reason.
@@ -81,6 +84,9 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
    */
   void incrementRotationFailureCount();
 
+  /** Increment the SYNC to STORE_AND_FORWARD transition counter. */
+  void incrementSyncToSafTransitions();
+
   /**
    * Unregister this metrics source.
    */
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index 718f6ac63d..ea552b47cd 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -28,6 +28,7 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
 
   private final MutableFastCounter rotationCount;
   private final MutableFastCounter rotationFailuresCount;
+  private final MutableFastCounter syncToSafTransitions;
   private final MutableHistogram appendTime;
   private final MutableHistogram syncTime;
   private final MutableHistogram rotationTime;
@@ -44,6 +45,8 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
     rotationCount = getMetricsRegistry().newCounter(ROTATION_COUNT, 
ROTATION_COUNT_DESC, 0L);
     rotationFailuresCount =
       getMetricsRegistry().newCounter(ROTATION_FAILURES, 
ROTATION_FAILURES_DESC, 0L);
+    syncToSafTransitions =
+      getMetricsRegistry().newCounter(SYNC_TO_SAF_TRANSITIONS, 
SYNC_TO_SAF_TRANSITIONS_DESC, 0L);
     appendTime = getMetricsRegistry().newHistogram(APPEND_TIME, 
APPEND_TIME_DESC);
     syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
     rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME, 
ROTATION_TIME_DESC);
@@ -65,6 +68,11 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
     rotationFailuresCount.incr();
   }
 
+  @Override
+  public void incrementSyncToSafTransitions() {
+    syncToSafTransitions.incr();
+  }
+
   @Override
   public void updateAppendTime(long timeNs) {
     appendTime.add(timeNs);
@@ -88,7 +96,8 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
   @Override
   public ReplicationLogMetricValues getCurrentMetricValues() {
     return new ReplicationLogMetricValues(rotationCount.value(), 
rotationFailuresCount.value(),
-      appendTime.getMax(), syncTime.getMax(), rotationTime.getMax(), 
ringBufferTime.getMax());
+      syncToSafTransitions.value(), appendTime.getMax(), syncTime.getMax(), 
rotationTime.getMax(),
+      ringBufferTime.getMax());
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
index ef64552ece..ac1aab4e20 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -22,15 +22,18 @@ public class ReplicationLogMetricValues {
 
   private final long rotationCount;
   private final long rotationFailuresCount;
+  private final long syncToSafTransitions;
   private final long appendTime;
   private final long syncTime;
   private final long rotationTime;
   private final long ringBufferTime;
 
-  public ReplicationLogMetricValues(long rotationCount, long 
rotationFailuresCount, long appendTime,
-    long syncTime, long rotationTime, long ringBufferTime) {
+  public ReplicationLogMetricValues(long rotationCount, long 
rotationFailuresCount,
+    long syncToSafTransitions, long appendTime, long syncTime, long 
rotationTime,
+    long ringBufferTime) {
     this.rotationCount = rotationCount;
     this.rotationFailuresCount = rotationFailuresCount;
+    this.syncToSafTransitions = syncToSafTransitions;
     this.appendTime = appendTime;
     this.syncTime = syncTime;
     this.rotationTime = rotationTime;
@@ -45,6 +48,10 @@ public class ReplicationLogMetricValues {
     return rotationFailuresCount;
   }
 
+  public long getSyncToSafTransitions() {
+    return syncToSafTransitions;
+  }
+
   public long getAppendTime() {
     return appendTime;
   }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index 5ff9fc2d41..063020523e 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -113,8 +113,7 @@ public class ReplicationLogBaseTest {
     storeRecord = initHAGroupStoreRecord();
     
doReturn(Optional.of(storeRecord)).when(haGroupStoreManager).getHAGroupStoreRecord(anyString());
 
-    logGroup = new TestableLogGroup(conf, serverName, haGroupName, 
haGroupStoreManager);
-    logGroup.init();
+    logGroup = createAndInitLogGroup();
   }
 
   @After
@@ -133,8 +132,14 @@ public class ReplicationLogBaseTest {
     if (logGroup != null) {
       logGroup.close();
     }
-    logGroup = new TestableLogGroup(conf, serverName, haGroupName, 
haGroupStoreManager);
-    logGroup.init();
+    logGroup = createAndInitLogGroup();
+  }
+
+  private ReplicationLogGroup createAndInitLogGroup() throws Exception {
+    ReplicationLogGroup group =
+      spy(new TestableLogGroup(conf, serverName, haGroupName, 
haGroupStoreManager));
+    group.init();
+    return group;
   }
 
   protected static void waitForRotationTick(int roundDurationSeconds) throws 
InterruptedException {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 3bd389a0d5..727959541e 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -30,9 +30,9 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
@@ -42,12 +42,15 @@ import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
 import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -259,12 +262,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // Append some data
     logGroup.append(tableName, commitId, put);
 
-    // sync on the writer will timeout
+    // sync on the writer will timeout — syncInternal wraps in 
PhoenixWALSyncTimeoutException
+    // and calls abort() which throws RuntimeException
     try {
       logGroup.sync();
       fail("Should have thrown RuntimeException because sync timed out");
     } catch (RuntimeException e) {
-      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+        e.getCause() instanceof PhoenixWALSyncTimeoutException);
     }
     // reset
     doNothing().when(innerWriter).sync();
@@ -619,12 +624,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       anyLong(), any(Mutation.class));
 
     // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
+    // The sync future times out, syncInternal wraps in 
PhoenixWALSyncTimeoutException and aborts.
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
-      fail("Should have thrown Runtime because sync timed out");
+      fail("Should have thrown RuntimeException because sync timed out");
     } catch (RuntimeException e) {
-      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+        e.getCause() instanceof PhoenixWALSyncTimeoutException);
     }
 
     // Verify that subsequent operations fail because the log is closed
@@ -672,8 +679,9 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // Try to sync. Should fail after exhausting retries and then switch to 
STORE_AND_FORWARD
     logGroup.sync();
 
-    // All retries use the same writer — verify sync was attempted maxAttempts 
times
-    verify(initialWriter, atLeast(2)).sync();
+    // Attempt 1 syncs on initialWriter (fails), rotation creates a new writer 
(also fails),
+    // attempt 2 syncs on the rotated writer (fails). Both SYNC attempts 
exhausted → SAF flip.
+    verify(initialWriter, times(1)).sync();
     assertEquals(STORE_AND_FORWARD, logGroup.getMode());
   }
 
@@ -963,12 +971,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).getLength();
 
     // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
+    // The sync future times out, syncInternal wraps in 
PhoenixWALSyncTimeoutException and aborts.
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
       fail("Should have thrown RuntimeException because sync timed out");
     } catch (RuntimeException e) {
-      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+        e.getCause() instanceof PhoenixWALSyncTimeoutException);
     }
 
     // Verify that subsequent operations fail because the log is closed
@@ -1008,7 +1018,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       logGroup.sync();
       fail("Should have thrown RuntimeException because sync timed out");
     } catch (RuntimeException e) {
-      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+        e.getCause() instanceof PhoenixWALSyncTimeoutException);
     }
 
     // Verify that subsequent append operations fail because the log is closed
@@ -1048,7 +1059,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       logGroup.sync();
       fail("Should have thrown RuntimeException because sync timed out");
     } catch (RuntimeException e) {
-      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
+        e.getCause() instanceof PhoenixWALSyncTimeoutException);
     }
 
     // Verify that subsequent sync operations fail because the log is closed
@@ -1263,15 +1275,21 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     ReplicationLog activeLog = logGroup.getActiveLog();
     LogFileWriter writer = activeLog.getWriter();
     assertNotNull("Writer should not be null", writer);
-    // keep returning the same writer
-    doAnswer(invocation -> writer).when(activeLog).createNewWriter();
+
+    // Rotated writers must also fail on the 5th append so the retry doesn't 
rescue the loop.
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      doThrow(new IOException("Simulate append 
failure")).when(w).append(tableName, commitId5,
+        put5);
+      return w;
+    }).when(activeLog).createNewWriter();
 
     logGroup.append(tableName, commitId1, put1);
     logGroup.append(tableName, commitId2, put2);
     logGroup.append(tableName, commitId3, put3);
     logGroup.append(tableName, commitId4, put4);
 
-    // configure writer to throw IOException on the 5th append
+    // configure initial writer to throw IOException on the 5th append
     doThrow(new IOException("Simulate append 
failure")).when(writer).append(tableName, commitId5,
       put5);
 
@@ -1456,9 +1474,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
   }
 
   /**
-   * Tests that a failed replay is retried on the next attempt. The new 
writer's first append fails
-   * during replay, so the generation stays stale. On retry, the generation 
mismatch is detected
-   * again and replay succeeds.
+   * Tests that a failed replay is retried on a fresh writer. The first 
rotated writer's append
+   * fails during replay. Rotation creates a second fresh writer; replay 
succeeds on it.
    */
   @Test
   public void testReplayFailureRetries() throws Exception {
@@ -1491,27 +1508,25 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     activeLog.forceRotation();
 
     // 3rd append triggers swap + replay of [r1, r2].
-    // Attempt 1: replay fails on first record → IOException → generation 
stays stale
-    // Attempt 2: generation mismatch still → replay retries → succeeds → r3 
appended
+    // Attempt 1 on W2: replay fails on r1 → IOException → request rotation → 
creates W3
+    // Attempt 2 on W3: replay r1+r2 succeeds → r3 appended → sync succeeds
     logGroup.append(tableName, commitId + 2, put);
     logGroup.sync();
 
-    LogFileWriter newWriter = activeLog.getWriter();
-    assertTrue("Should be using new writer", newWriter != initialWriter);
+    LogFileWriter finalWriter = activeLog.getWriter();
+    assertTrue("Should be using a fresh writer", finalWriter != initialWriter);
 
-    // New writer: attempt 1 replay failed on r1 (1 call). Attempt 2 replayed 
r1+r2 (2 calls)
-    // then appended r3. Total: r1 called 2x, r2 called 1x, r3 called 1x.
-    verify(newWriter, times(2)).append(eq(tableName), eq(commitId), eq(put));
-    verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 1), 
eq(put));
-    verify(newWriter, times(1)).append(eq(tableName), eq(commitId + 2), 
eq(put));
-    verify(newWriter, times(1)).sync();
+    // Final writer (W3): replayed r1+r2 then appended r3 — each exactly once.
+    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
+    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 1), 
eq(put));
+    verify(finalWriter, times(1)).append(eq(tableName), eq(commitId + 2), 
eq(put));
+    verify(finalWriter, times(1)).sync();
   }
 
   /**
-   * Tests error-recovery rotation: when the current writer's stream is 
broken, the second failure
-   * in apply() triggers requestRotation() which submits an on-demand 
LogRotationTask. During the
-   * retry sleep, the background thread creates a new writer. The next attempt 
drains it and
-   * succeeds.
+   * Tests error-recovery rotation: when the current writer's stream is 
broken, the first failure in
+   * apply() triggers requestRotation() which submits an on-demand 
LogRotationTask. During the latch
+   * wait, the background thread creates a new writer. The next attempt drains 
it and succeeds.
    */
   @Test
   public void testErrorRecoveryRequestsNewWriter() throws Exception {
@@ -1527,16 +1542,16 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     doThrow(new IOException("Simulated broken 
stream")).when(initialWriter).append(anyString(),
       anyLong(), any(Mutation.class));
 
-    // Append — attempt 1 fails, attempt 2 fails + requestRotation(), during 
sleep the
-    // background thread creates a new writer, attempt 3 drains it and succeeds
+    // Append — attempt 1 fails on initialWriter, rotation requested, attempt 
2 drains the
+    // rotated writer and succeeds
     logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
     LogFileWriter newWriter = activeLog.getWriter();
     assertTrue("Should be using a new writer after error recovery", newWriter 
!= initialWriter);
 
-    // Old writer received failed attempts
-    verify(initialWriter, atLeast(2)).append(eq(tableName), eq(commitId), 
eq(put));
+    // Old writer received 1 failed attempt
+    verify(initialWriter, times(1)).append(eq(tableName), eq(commitId), 
eq(put));
     // New writer received the successful append
     verify(newWriter, times(1)).append(eq(tableName), eq(commitId), eq(put));
     verify(newWriter, times(1)).sync();
@@ -1622,38 +1637,160 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       writerAfterRotation != writerBeforeRotation);
   }
 
-  // @Test
-  public void testAppendTimeoutWhileSyncPending() throws Exception {
-    final String tableName = "TESTTBL";
-    final long commitId1 = 1L;
-    final Mutation put1 = LogFileTestUtil.newPut("row1", 1, 1);
+  /**
+   * Tests that a RuntimeException in LogRotationTask does not suppress future 
scheduled ticks.
+   * Prior to the Throwable catch fix, an unchecked exception would kill the
+   * ScheduledExecutorService silently.
+   */
+  @Test
+  public void testRuntimeExceptionInRotationDoesNotSuppressFutureTicks() 
throws Exception {
+    final String tableName = "TBLRERT";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+    final int roundDurationSeconds = 5;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
 
-    // Get the inner writer
     ReplicationLog activeLog = logGroup.getActiveLog();
-    LogFileWriter writer = activeLog.getWriter();
-    assertNotNull("Writer should not be null", writer);
-    // keep returning the same writer
-    // doAnswer(invocation -> writer).when(activeLog).createNewWriter();
-    doAnswer(new Answer<Object>() {
-      @Override
-      public Object answer(InvocationOnMock invocation) throws Throwable {
-        // Thread.sleep((long)(TEST_SYNC_TIMEOUT * 1.25)); // Simulate slow 
append processing
-        // throw new CallTimeoutException("Simulate append timeout");
-        Object result = invocation.callRealMethod();
-        sleep((long) (TEST_SYNC_TIMEOUT * 1.25)); // Simulate slow but 
successful append
-        return result;
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // First rotation tick throws RuntimeException, subsequent ones succeed
+    AtomicBoolean shouldThrow = new AtomicBoolean(true);
+    doAnswer(invocation -> {
+      if (shouldThrow.getAndSet(false)) {
+        throw new RuntimeException("Simulated NPE in rotation");
       }
-    }).when(writer).append(anyString(), anyLong(), any(Mutation.class));
+      return invocation.callRealMethod();
+    }).when(activeLog).createNewWriter();
 
-    logGroup.append(tableName, commitId1, put1);
+    // Append and sync to establish baseline
+    logGroup.append(tableName, commitId, put);
     logGroup.sync();
 
-    LogFileWriter storeAndForwardWriter = logGroup.getActiveLog().getWriter();
-    assertTrue("After switching mode we should have a new writer", writer != 
storeAndForwardWriter);
-    InOrder inOrder = Mockito.inOrder(storeAndForwardWriter);
-    // verify that all the in-flight appends and syncs are replayed on the new 
store and forward
-    // writer
-    inOrder.verify(storeAndForwardWriter, times(1)).append(eq(tableName), 
eq(commitId1), eq(put1));
-    inOrder.verify(storeAndForwardWriter, times(1)).sync();
+    // Wait for first tick — RuntimeException, rotation fails
+    waitForRotationTick(roundDurationSeconds);
+
+    assertTrue("rotationFailures should be incremented", 
activeLog.rotationFailures.get() >= 1);
+
+    // Wait for second tick — should succeed (scheduler not suppressed)
+    waitForRotationTick(roundDurationSeconds);
+
+    // Drain the staged writer
+    logGroup.append(tableName, commitId + 1, put);
+    logGroup.sync();
+
+    LogFileWriter writerAfterRotation = activeLog.getWriter();
+    assertTrue("Second tick should have created a new writer",
+      writerAfterRotation != initialWriter);
+    assertEquals("rotationFailures should be reset after success", 0,
+      activeLog.rotationFailures.get());
+  }
+
+  /**
+   * Tests that when both SYNC attempts and both SAF attempts fail, the abort 
path fires via
+   * LogEventHandler's fatal catch block
+   */
+  @Test
+  public void testBothSyncAndSafFailuresTriggersAbort() throws Exception {
+    final String tableName = "TBLBSSF";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Poison every writer created by any ReplicationLog (SYNC or SAF) to fail 
on sync
+    Answer<Object> poisonNewWriter = invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      doThrow(new IOException("Simulated sync failure")).when(w).sync();
+      return w;
+    };
+    Answer<Object> poisonLog = invocation -> {
+      ReplicationLog log = (ReplicationLog) invocation.callRealMethod();
+      doAnswer(poisonNewWriter).when(log).createNewWriter();
+      return log;
+    };
+    doAnswer(poisonLog).when(logGroup).createFallbackLog();
+
+    // Poison the already-initialized SYNC log
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    doThrow(new IOException("Simulated sync 
failure")).when(initialWriter).sync();
+    doAnswer(poisonNewWriter).when(activeLog).createNewWriter();
+
+    logGroup.append(tableName, commitId, put);
+    try {
+      logGroup.sync();
+      fail("Should have thrown RuntimeException from abort");
+    } catch (RuntimeException e) {
+      assertTrue("Abort message should mention both SYNC and SAF",
+        e.getMessage().contains("Both SYNC and SAF replication writes failed")
+          || e.getMessage().contains("ABORTING"));
+    }
+  }
+
+  /**
+   * Tests that when already in SAF mode and both SAF attempts fail,
+   * StoreAndForwardModeImpl.onFailure() triggers abort.
+   */
+  @Test
+  public void testSafBothAttemptsFailTriggersAbort() throws Exception {
+    final String tableName = "TBLSAFBA";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Start in SAF mode
+    initialState = HAGroupState.ACTIVE_NOT_IN_SYNC;
+    storeRecord = new HAGroupStoreRecord(null, haGroupName, initialState, 0,
+      HighAvailabilityPolicy.FAILOVER.toString(), "peerZKUrl", "clusterUrl", 
"peerClusterUrl",
+      localUri.toString(), peerUri.toString(), 0L);
+    
doReturn(Optional.of(storeRecord)).when(haGroupStoreManager).getHAGroupStoreRecord(anyString());
+    recreateLogGroup();
+    assertEquals(STORE_AND_FORWARD, logGroup.getMode());
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // All SAF writers fail on sync
+    doThrow(new IOException("Simulated SAF sync 
failure")).when(initialWriter).sync();
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      doThrow(new IOException("Simulated SAF sync failure")).when(w).sync();
+      return w;
+    }).when(activeLog).createNewWriter();
+
+    logGroup.append(tableName, commitId, put);
+    try {
+      logGroup.sync();
+      fail("Should have thrown RuntimeException from abort");
+    } catch (RuntimeException e) {
+      assertTrue("Abort should fire from SAF failure path",
+        e.getMessage().contains("ABORTING") || e.getMessage().contains("got 
error"));
+    }
+  }
+
+  /**
+   * Tests that calculateSyncTimeout derives from 
hbase.regionserver.wal.sync.timeout and that the
+   * explicit phoenix override still wins.
+   */
+  @Test
+  public void testCalculateSyncTimeout() throws Exception {
+    // Remove the explicit override so calculateSyncTimeout is used
+    conf.unset(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY);
+
+    // Set the HBase WAL sync timeout to a known value
+    conf.setLong("hbase.regionserver.wal.sync.timeout", 120000L);
+    recreateLogGroup();
+
+    // syncTimeoutMs should be walSyncTimeout + zkTimeout
+    long expectedZkTimeout = conf.getLong("hbase.zookeeper.session.timeout", 
90000);
+    long expected = 120000L + expectedZkTimeout;
+    assertEquals("syncTimeoutMs should derive from WAL sync timeout", expected,
+      logGroup.syncTimeoutMs);
+
+    // Now set the explicit Phoenix override — it should win
+    conf.setLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 5000L);
+    recreateLogGroup();
+    assertEquals("Explicit override should take precedence", 5000L, 
logGroup.syncTimeoutMs);
   }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
index 5f07d3b167..cd08ea5334 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterSyncTest.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.replication.log;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -69,8 +70,11 @@ public class LogFileWriterSyncTest {
     // Create the writer instance to be tested
     writer = new LogFileWriter();
 
-    // Initialize the writer - this will call fs.create() and set up internal 
writers
+    // Initialize the writer - this will call fs.create() and set up internal 
writers.
+    // Init syncs the header to force HDFS block allocation; clear that from 
invocation history
+    // so tests only verify sync behavior during append/sync operations.
     writer.init(writerContext);
+    clearInvocations(internalOutput);
   }
 
   @After
@@ -205,6 +209,7 @@ public class LogFileWriterSyncTest {
       new LogFileWriterContext(hflushConf).setFileSystem(hflushMockFs);
     LogFileWriter hflushWriter = new LogFileWriter();
     hflushWriter.init(hflushContext);
+    clearInvocations(hflushOutput);
 
     try {
       Mutation m1 = LogFileTestUtil.newPut("row1", 1L, 1);

Reply via email to