This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new e4d9aae40b PHOENIX-7769 Initialize and Close replication log groups as 
part of RS startup and shutdown (#2465)
e4d9aae40b is described below

commit e4d9aae40bf795baaa87cf201831379726c60773
Author: tkhurana <[email protected]>
AuthorDate: Thu May 7 14:35:23 2026 -0700

    PHOENIX-7769 Initialize and Close replication log groups as part of RS 
startup and shutdown (#2465)
---
 .../coprocessor/PhoenixRegionServerEndpoint.java   |  27 +++++
 .../phoenix/replication/ReplicationLogGroup.java   |  20 ++++
 .../PhoenixRegionServerEndpointTestImpl.java       |  11 --
 .../replication/ReplicationLogBaseTest.java        |  37 ++++---
 .../ReplicationLogDiscoveryForwarderTest.java      |   5 +
 .../replication/ReplicationLogDiscoveryTest.java   | 114 +++++++++++----------
 6 files changed, 137 insertions(+), 77 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index cdb80df926..3f35b40cef 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static 
org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED;
+
 import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
@@ -30,7 +33,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerMetadataCache;
 import org.apache.phoenix.cache.ServerMetadataCacheImpl;
@@ -46,6 +51,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.ReplicationLogGroup;
 import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -61,6 +67,7 @@ public class PhoenixRegionServerEndpoint extends
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
   private MetricsMetadataCachingSource metricsSource;
   protected Configuration conf;
+  protected ServerName serverName;
   private ExecutorService prewarmExecutor;
 
   // regionserver level thread pool used by Uncovered Indexes to scan data 
table rows
@@ -69,6 +76,9 @@ public class PhoenixRegionServerEndpoint extends
   @Override
   public void start(CoprocessorEnvironment env) throws IOException {
     this.conf = env.getConfiguration();
+    if (env instanceof RegionServerCoprocessorEnvironment) {
+      this.serverName = ((RegionServerCoprocessorEnvironment) 
env).getServerName();
+    }
     this.metricsSource =
       
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
     initUncoveredIndexThreadPool(this.conf);
@@ -89,6 +99,10 @@ public class PhoenixRegionServerEndpoint extends
   public void stop(CoprocessorEnvironment env) throws IOException {
     // Stop replication log replay
     ReplicationLogReplayService.getInstance(conf).stop();
+    // Close all ReplicationLogGroup instances belonging to this server
+    if (serverName != null) {
+      ReplicationLogGroup.closeAll(serverName);
+    }
     RegionServerCoprocessor.super.stop(env);
     if (uncoveredIndexThreadPool != null) {
       uncoveredIndexThreadPool
@@ -282,6 +296,9 @@ public class PhoenixRegionServerEndpoint extends
       }
 
       // Phase 2: Prewarm individual HAGroupStoreClients with retry
+      // and eagerly initialize ReplicationLogGroup instances
+      boolean shouldInitReplicationLogGroup = serverName != null && conf
+        .getBoolean(SYNCHRONOUS_REPLICATION_ENABLED, 
DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED);
       try {
         while (!pending.isEmpty()) {
           Iterator<String> iterator = pending.iterator();
@@ -289,6 +306,16 @@ public class PhoenixRegionServerEndpoint extends
             String haGroup = iterator.next();
             try {
               manager.getClusterRoleRecord(haGroup);
+              if (shouldInitReplicationLogGroup) {
+                try {
+                  ReplicationLogGroup.get(conf, serverName, haGroup);
+                  LOGGER.info("Eagerly initialized ReplicationLogGroup {} on 
server {}", haGroup,
+                    serverName);
+                } catch (Exception e) {
+                  LOGGER.warn("Failed to eagerly initialize 
ReplicationLogGroup for HA group: {}."
+                    + " Will be lazily initialized on first mutation.", 
haGroup, e);
+                }
+              }
               iterator.remove();
               LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)", 
haGroup,
                 pending.size());
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index d4ac31b493..c88d9b2647 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -630,6 +630,7 @@ public class ReplicationLogGroup {
       if (closed) {
         return;
       }
+      LOG.info("Closing HAGroup {}", this);
       // setting closed to true prevents future producers to add events to the 
ring buffer
       closed = true;
       // Remove from instances cache
@@ -652,6 +653,25 @@ public class ReplicationLogGroup {
     }
   }
 
+  /**
+   * Close all ReplicationLogGroup instances belonging to the given server. 
Called during
+   * RegionServer shutdown.
+   */
+  public static void closeAll(ServerName serverName) {
+    LOG.info("Closing all ReplicationLogGroup instances for server {}, total 
count={}", serverName,
+      INSTANCES.size());
+    List<ReplicationLogGroup> groups = new ArrayList<>(INSTANCES.values());
+    for (ReplicationLogGroup group : groups) {
+      if (group.serverName.equals(serverName)) {
+        try {
+          group.close();
+        } catch (Exception e) {
+          LOG.warn("Error closing ReplicationLogGroup for HA group: {}", 
group.haGroupName, e);
+        }
+      }
+    }
+  }
+
   private void shutdownDisruptorExecutor() {
     disruptorExecutor.shutdown();
     try {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
index eda9c73c1e..a0a6e65796 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
@@ -17,10 +17,6 @@
  */
 package org.apache.phoenix.end2end;
 
-import java.io.IOException;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.phoenix.cache.ServerMetadataCache;
 import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
 
@@ -29,13 +25,6 @@ import 
org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
  * support keeping multiple cache instances.
  */
 public class PhoenixRegionServerEndpointTestImpl extends 
PhoenixRegionServerEndpoint {
-  protected ServerName serverName;
-
-  @Override
-  public void start(CoprocessorEnvironment env) throws IOException {
-    super.start(env);
-    this.serverName = ((RegionServerCoprocessorEnvironment) 
env).getServerName();
-  }
 
   @Override
   public ServerMetadataCache getServerMetadataCache() {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index 063020523e..f748e3d1a8 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.replication;
 
-import static 
org.apache.phoenix.replication.ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY;
 import static 
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -106,7 +105,6 @@ public class ReplicationLogBaseTest {
     // small value of replication round duration
     conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
       TEST_REPLICATION_ROUND_DURATION_SECONDS);
-    conf.setDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, 0.0);
     overrideConf(conf);
 
     // initialize the group store record
@@ -136,12 +134,16 @@ public class ReplicationLogBaseTest {
   }
 
   private ReplicationLogGroup createAndInitLogGroup() throws Exception {
-    ReplicationLogGroup group =
-      spy(new TestableLogGroup(conf, serverName, haGroupName, 
haGroupStoreManager));
+    ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, 
haGroupName,
+      haGroupStoreManager, useAlignedRotation()));
     group.init();
     return group;
   }
 
+  protected boolean useAlignedRotation() {
+    return false;
+  }
+
   protected static void waitForRotationTick(int roundDurationSeconds) throws 
InterruptedException {
     Thread.sleep((long) (roundDurationSeconds * 1000 * 1.25));
     LOG.info("Waking up after waiting for rotation tick");
@@ -154,41 +156,48 @@ public class ReplicationLogBaseTest {
   }
 
   static class TestableLogGroup extends ReplicationLogGroup {
+    private final boolean useAlignedRotation;
 
     public TestableLogGroup(Configuration conf, ServerName serverName, String 
haGroupName,
-      HAGroupStoreManager haGroupStoreManager) {
+      HAGroupStoreManager haGroupStoreManager, boolean useAlignedRotation) {
       super(conf, serverName, haGroupName, haGroupStoreManager);
+      this.useAlignedRotation = useAlignedRotation;
     }
 
     @Override
     protected ReplicationLog createStandbyLog() throws IOException {
-      return spy(new TestableLog(this, peerShardManager));
+      return spy(new TestableLog(this, peerShardManager, useAlignedRotation));
     }
 
     @Override
     protected ReplicationLog createFallbackLog() throws IOException {
-      return spy(new TestableLog(this, localShardManager));
+      return spy(new TestableLog(this, localShardManager, useAlignedRotation));
     }
 
   }
 
   /**
    * Testable version of ReplicationLog that allows spying on the log. 
Overrides
-   * startRotationExecutor to always use a full round as initial delay so that 
the rotation task
-   * never fires unexpectedly when a test happens to start near a round 
boundary.
+   * startRotationExecutor to use a configurable initial delay. By default 
uses a full round
+   * duration so the rotation task never fires early when the test happens to 
start near a round
+   * boundary. Set {@code useAlignedRotation} to true to use the 
production-aligned delay.
    */
   static class TestableLog extends ReplicationLog {
+    private final boolean useAlignedRotation;
 
-    public TestableLog(ReplicationLogGroup logGroup,
-      ReplicationShardDirectoryManager shardManager) {
+    public TestableLog(ReplicationLogGroup logGroup, 
ReplicationShardDirectoryManager shardManager,
+      boolean useAlignedRotation) {
       super(logGroup, shardManager);
+      this.useAlignedRotation = useAlignedRotation;
     }
 
     @Override
     protected void startRotationExecutor() {
-      // Use a full round as the initial delay so the rotation task never 
fires early when the
-      // test happens to start close to a round boundary (e.g. initialDelay of 
852ms on a 60s round)
-      super.startRotationExecutor(rotationTimeMs);
+      if (useAlignedRotation) {
+        super.startRotationExecutor();
+      } else {
+        super.startRotationExecutor(rotationTimeMs);
+      }
     }
 
     @Override
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
index 770956b70d..46efbf196e 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
@@ -62,6 +62,11 @@ public class ReplicationLogDiscoveryForwarderTest extends 
ReplicationLogBaseTest
     conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
   }
 
+  @Override
+  protected boolean useAlignedRotation() {
+    return true;
+  }
+
   @Before
   public void setUp() throws IOException {
     ReplicationMode mode = logGroup.getMode();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index 6ed7d8723f..4af410e56e 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -1082,67 +1082,77 @@ public class ReplicationLogDiscoveryTest {
       .when(discovery)
       .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(file3Prefix)));
 
-    // Process in-progress directory
-    discovery.processInProgressDirectory();
+    // Inject an advancing clock so that rename timestamps from markInProgress 
are always
+    // strictly before the renameTimestampThreshold computed on the next loop 
iteration
+    AtomicLong clock = new AtomicLong(EnvironmentEdgeManager.currentTime());
+    EnvironmentEdgeManager.injectEdge(clock::getAndIncrement);
 
-    // Verify that markInProgress was called 7 times (5 initially + 2 for 
retries)
-    Mockito.verify(fileTracker, 
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
+    try {
+      // Process in-progress directory
+      discovery.processInProgressDirectory();
 
-    // Verify that markInProgress was called for each expected file
-    // Files 1 and 3 are called twice (initial attempt + retry), others once
-    for (int i = 0; i < allInProgressFiles.size(); i++) {
-      Path expectedFile = allInProgressFiles.get(i);
-      String expectedPrefix = extractPrefix(expectedFile.getName());
-      int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are 
retried
-      Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress(
-        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
-    }
+      // Verify that markInProgress was called 7 times (5 initially + 2 for 
retries)
+      Mockito.verify(fileTracker, 
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
 
-    // Verify that processFile was called for each file in the directory (i.e. 
5 + 2 times for
-    // failed once that would succeed in next retry)
-    Mockito.verify(discovery, 
Mockito.times(7)).processFile(Mockito.any(Path.class));
+      // Verify that markInProgress was called for each expected file
+      // Files 1 and 3 are called twice (initial attempt + retry), others once
+      for (int i = 0; i < allInProgressFiles.size(); i++) {
+        Path expectedFile = allInProgressFiles.get(i);
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are 
retried
+        Mockito.verify(fileTracker, 
Mockito.times(expectedTimes)).markInProgress(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
 
-    // Verify that processFile was called for each specific file (using prefix 
matching)
-    // Files 1 and 3 should be called twice (fail once, succeed on retry), 
others once
-    for (int i = 0; i < allInProgressFiles.size(); i++) {
-      Path expectedFile = allInProgressFiles.get(i);
-      String expectedPrefix = extractPrefix(expectedFile.getName());
-      int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are 
called twice (fail +
-                                                      // retry success)
-      Mockito.verify(discovery, Mockito.times(expectedTimes))
-        .processFile(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
-    }
+      // Verify that processFile was called for each file in the directory 
(i.e. 5 + 2 times for
+      // failed once that would succeed in next retry)
+      Mockito.verify(discovery, 
Mockito.times(7)).processFile(Mockito.any(Path.class));
 
-    // Verify that markCompleted was called for each successfully processed 
file
-    Mockito.verify(fileTracker, 
Mockito.times(5)).markCompleted(Mockito.any(Path.class));
+      // Verify that processFile was called for each specific file (using 
prefix matching)
+      // Files 1 and 3 should be called twice (fail once, succeed on retry), 
others once
+      for (int i = 0; i < allInProgressFiles.size(); i++) {
+        Path expectedFile = allInProgressFiles.get(i);
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are 
called twice (fail +
+                                                        // retry success)
+        Mockito.verify(discovery, Mockito.times(expectedTimes)).processFile(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
 
-    // Verify that markCompleted was called for 2 intermittent failed 
processed file
-    Mockito.verify(fileTracker, 
Mockito.times(2)).markFailed(Mockito.any(Path.class));
+      // Verify that markCompleted was called for each successfully processed 
file
+      Mockito.verify(fileTracker, 
Mockito.times(5)).markCompleted(Mockito.any(Path.class));
 
-    // Verify that markFailed was called once ONLY for failed files
-    String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName());
-    Mockito.verify(fileTracker, Mockito.times(1))
-      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(failedPrefix1)));
-    String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName());
-    Mockito.verify(fileTracker, Mockito.times(1))
-      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(failedPrefix3)));
+      // Verify that markCompleted was called for 2 intermittent failed 
processed file
+      Mockito.verify(fileTracker, 
Mockito.times(2)).markFailed(Mockito.any(Path.class));
 
-    // Verify that markFailed was NOT called for files processed successfully 
in first iteration
-    String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName());
-    Mockito.verify(fileTracker, Mockito.never())
-      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix0)));
-    String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName());
-    Mockito.verify(fileTracker, Mockito.never())
-      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix2)));
-    String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName());
-    Mockito.verify(fileTracker, Mockito.never())
-      .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix4)));
+      // Verify that markFailed was called once ONLY for failed files
+      String failedPrefix1 = 
extractPrefix(allInProgressFiles.get(1).getName());
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(failedPrefix1)));
+      String failedPrefix3 = 
extractPrefix(allInProgressFiles.get(3).getName());
+      Mockito.verify(fileTracker, Mockito.times(1))
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(failedPrefix3)));
 
-    // Verify that markCompleted was called for each successfully processed 
file with correct paths
-    for (Path expectedFile : allInProgressFiles) {
-      String expectedPrefix = extractPrefix(expectedFile.getName());
-      Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
-        Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      // Verify that markFailed was NOT called for files processed 
successfully in first iteration
+      String successPrefix0 = 
extractPrefix(allInProgressFiles.get(0).getName());
+      Mockito.verify(fileTracker, Mockito.never())
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix0)));
+      String successPrefix2 = 
extractPrefix(allInProgressFiles.get(2).getName());
+      Mockito.verify(fileTracker, Mockito.never())
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix2)));
+      String successPrefix4 = 
extractPrefix(allInProgressFiles.get(4).getName());
+      Mockito.verify(fileTracker, Mockito.never())
+        .markFailed(Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(successPrefix4)));
+
+      // Verify that markCompleted was called for each successfully processed 
file with correct
+      // paths
+      for (Path expectedFile : allInProgressFiles) {
+        String expectedPrefix = extractPrefix(expectedFile.getName());
+        Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+          Mockito.argThat(path -> 
extractPrefix(path.getName()).equals(expectedPrefix)));
+      }
+    } finally {
+      EnvironmentEdgeManager.reset();
     }
   }
 

Reply via email to