This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new e4d9aae40b PHOENIX-7769 Initialize and Close replication log groups as
part of RS startup and shutdown (#2465)
e4d9aae40b is described below
commit e4d9aae40bf795baaa87cf201831379726c60773
Author: tkhurana <[email protected]>
AuthorDate: Thu May 7 14:35:23 2026 -0700
PHOENIX-7769 Initialize and Close replication log groups as part of RS
startup and shutdown (#2465)
---
.../coprocessor/PhoenixRegionServerEndpoint.java | 27 +++++
.../phoenix/replication/ReplicationLogGroup.java | 20 ++++
.../PhoenixRegionServerEndpointTestImpl.java | 11 --
.../replication/ReplicationLogBaseTest.java | 37 ++++---
.../ReplicationLogDiscoveryForwarderTest.java | 5 +
.../replication/ReplicationLogDiscoveryTest.java | 114 +++++++++++----------
6 files changed, 137 insertions(+), 77 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index cdb80df926..3f35b40cef 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.coprocessor;
+import static
org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED;
+
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
@@ -30,7 +33,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerMetadataCache;
import org.apache.phoenix.cache.ServerMetadataCacheImpl;
@@ -46,6 +51,7 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.ReplicationLogGroup;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -61,6 +67,7 @@ public class PhoenixRegionServerEndpoint extends
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
+ protected ServerName serverName;
private ExecutorService prewarmExecutor;
// regionserver level thread pool used by Uncovered Indexes to scan data
table rows
@@ -69,6 +76,9 @@ public class PhoenixRegionServerEndpoint extends
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
+ if (env instanceof RegionServerCoprocessorEnvironment) {
+ this.serverName = ((RegionServerCoprocessorEnvironment)
env).getServerName();
+ }
this.metricsSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
initUncoveredIndexThreadPool(this.conf);
@@ -89,6 +99,10 @@ public class PhoenixRegionServerEndpoint extends
public void stop(CoprocessorEnvironment env) throws IOException {
// Stop replication log replay
ReplicationLogReplayService.getInstance(conf).stop();
+ // Close all ReplicationLogGroup instances belonging to this server
+ if (serverName != null) {
+ ReplicationLogGroup.closeAll(serverName);
+ }
RegionServerCoprocessor.super.stop(env);
if (uncoveredIndexThreadPool != null) {
uncoveredIndexThreadPool
@@ -282,6 +296,9 @@ public class PhoenixRegionServerEndpoint extends
}
// Phase 2: Prewarm individual HAGroupStoreClients with retry
+ // and eagerly initialize ReplicationLogGroup instances
+ boolean shouldInitReplicationLogGroup = serverName != null && conf
+ .getBoolean(SYNCHRONOUS_REPLICATION_ENABLED,
DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED);
try {
while (!pending.isEmpty()) {
Iterator<String> iterator = pending.iterator();
@@ -289,6 +306,16 @@ public class PhoenixRegionServerEndpoint extends
String haGroup = iterator.next();
try {
manager.getClusterRoleRecord(haGroup);
+ if (shouldInitReplicationLogGroup) {
+ try {
+ ReplicationLogGroup.get(conf, serverName, haGroup);
+ LOGGER.info("Eagerly initialized ReplicationLogGroup {} on
server {}", haGroup,
+ serverName);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to eagerly initialize
ReplicationLogGroup for HA group: {}."
+ + " Will be lazily initialized on first mutation.",
haGroup, e);
+ }
+ }
iterator.remove();
LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)",
haGroup,
pending.size());
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index d4ac31b493..c88d9b2647 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -630,6 +630,7 @@ public class ReplicationLogGroup {
if (closed) {
return;
}
+ LOG.info("Closing HAGroup {}", this);
// setting closed to true prevents future producers to add events to the
ring buffer
closed = true;
// Remove from instances cache
@@ -652,6 +653,25 @@ public class ReplicationLogGroup {
}
}
+ /**
+ * Close all ReplicationLogGroup instances belonging to the given server.
Called during
+ * RegionServer shutdown.
+ */
+ public static void closeAll(ServerName serverName) {
+ LOG.info("Closing all ReplicationLogGroup instances for server {}, total
count={}", serverName,
+ INSTANCES.size());
+ List<ReplicationLogGroup> groups = new ArrayList<>(INSTANCES.values());
+ for (ReplicationLogGroup group : groups) {
+ if (group.serverName.equals(serverName)) {
+ try {
+ group.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing ReplicationLogGroup for HA group: {}",
group.haGroupName, e);
+ }
+ }
+ }
+ }
+
private void shutdownDisruptorExecutor() {
disruptorExecutor.shutdown();
try {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
index eda9c73c1e..a0a6e65796 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java
@@ -17,10 +17,6 @@
*/
package org.apache.phoenix.end2end;
-import java.io.IOException;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.phoenix.cache.ServerMetadataCache;
import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
@@ -29,13 +25,6 @@ import
org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
* support keeping multiple cache instances.
*/
public class PhoenixRegionServerEndpointTestImpl extends
PhoenixRegionServerEndpoint {
- protected ServerName serverName;
-
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- super.start(env);
- this.serverName = ((RegionServerCoprocessorEnvironment)
env).getServerName();
- }
@Override
public ServerMetadataCache getServerMetadataCache() {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index 063020523e..f748e3d1a8 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.replication;
-import static
org.apache.phoenix.replication.ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY;
import static
org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -106,7 +105,6 @@ public class ReplicationLogBaseTest {
// small value of replication round duration
conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
TEST_REPLICATION_ROUND_DURATION_SECONDS);
- conf.setDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, 0.0);
overrideConf(conf);
// initialize the group store record
@@ -136,12 +134,16 @@ public class ReplicationLogBaseTest {
}
private ReplicationLogGroup createAndInitLogGroup() throws Exception {
- ReplicationLogGroup group =
- spy(new TestableLogGroup(conf, serverName, haGroupName,
haGroupStoreManager));
+ ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName,
haGroupName,
+ haGroupStoreManager, useAlignedRotation()));
group.init();
return group;
}
+ protected boolean useAlignedRotation() {
+ return false;
+ }
+
protected static void waitForRotationTick(int roundDurationSeconds) throws
InterruptedException {
Thread.sleep((long) (roundDurationSeconds * 1000 * 1.25));
LOG.info("Waking up after waiting for rotation tick");
@@ -154,41 +156,48 @@ public class ReplicationLogBaseTest {
}
static class TestableLogGroup extends ReplicationLogGroup {
+ private final boolean useAlignedRotation;
public TestableLogGroup(Configuration conf, ServerName serverName, String
haGroupName,
- HAGroupStoreManager haGroupStoreManager) {
+ HAGroupStoreManager haGroupStoreManager, boolean useAlignedRotation) {
super(conf, serverName, haGroupName, haGroupStoreManager);
+ this.useAlignedRotation = useAlignedRotation;
}
@Override
protected ReplicationLog createStandbyLog() throws IOException {
- return spy(new TestableLog(this, peerShardManager));
+ return spy(new TestableLog(this, peerShardManager, useAlignedRotation));
}
@Override
protected ReplicationLog createFallbackLog() throws IOException {
- return spy(new TestableLog(this, localShardManager));
+ return spy(new TestableLog(this, localShardManager, useAlignedRotation));
}
}
/**
* Testable version of ReplicationLog that allows spying on the log.
Overrides
- * startRotationExecutor to always use a full round as initial delay so that
the rotation task
- * never fires unexpectedly when a test happens to start near a round
boundary.
+ * startRotationExecutor to use a configurable initial delay. By default
uses a full round
+ * duration so the rotation task never fires early when the test happens to
start near a round
+ * boundary. Set {@code useAlignedRotation} to true to use the
production-aligned delay.
*/
static class TestableLog extends ReplicationLog {
+ private final boolean useAlignedRotation;
- public TestableLog(ReplicationLogGroup logGroup,
- ReplicationShardDirectoryManager shardManager) {
+ public TestableLog(ReplicationLogGroup logGroup,
ReplicationShardDirectoryManager shardManager,
+ boolean useAlignedRotation) {
super(logGroup, shardManager);
+ this.useAlignedRotation = useAlignedRotation;
}
@Override
protected void startRotationExecutor() {
- // Use a full round as the initial delay so the rotation task never
fires early when the
- // test happens to start close to a round boundary (e.g. initialDelay of
852ms on a 60s round)
- super.startRotationExecutor(rotationTimeMs);
+ if (useAlignedRotation) {
+ super.startRotationExecutor();
+ } else {
+ super.startRotationExecutor(rotationTimeMs);
+ }
}
@Override
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
index 770956b70d..46efbf196e 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
@@ -62,6 +62,11 @@ public class ReplicationLogDiscoveryForwarderTest extends
ReplicationLogBaseTest
conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
}
+ @Override
+ protected boolean useAlignedRotation() {
+ return true;
+ }
+
@Before
public void setUp() throws IOException {
ReplicationMode mode = logGroup.getMode();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
index 6ed7d8723f..4af410e56e 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java
@@ -1082,67 +1082,77 @@ public class ReplicationLogDiscoveryTest {
.when(discovery)
.processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(file3Prefix)));
- // Process in-progress directory
- discovery.processInProgressDirectory();
+ // Inject an advancing clock so that rename timestamps from markInProgress
are always
+ // strictly before the renameTimestampThreshold computed on the next loop
iteration
+ AtomicLong clock = new AtomicLong(EnvironmentEdgeManager.currentTime());
+ EnvironmentEdgeManager.injectEdge(clock::getAndIncrement);
- // Verify that markInProgress was called 7 times (5 initially + 2 for
retries)
- Mockito.verify(fileTracker,
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
+ try {
+ // Process in-progress directory
+ discovery.processInProgressDirectory();
- // Verify that markInProgress was called for each expected file
- // Files 1 and 3 are called twice (initial attempt + retry), others once
- for (int i = 0; i < allInProgressFiles.size(); i++) {
- Path expectedFile = allInProgressFiles.get(i);
- String expectedPrefix = extractPrefix(expectedFile.getName());
- int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are
retried
- Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress(
- Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
- }
+ // Verify that markInProgress was called 7 times (5 initially + 2 for
retries)
+ Mockito.verify(fileTracker,
Mockito.times(7)).markInProgress(Mockito.any(Path.class));
- // Verify that processFile was called for each file in the directory (i.e.
5 + 2 times for
- // failed once that would succeed in next retry)
- Mockito.verify(discovery,
Mockito.times(7)).processFile(Mockito.any(Path.class));
+ // Verify that markInProgress was called for each expected file
+ // Files 1 and 3 are called twice (initial attempt + retry), others once
+ for (int i = 0; i < allInProgressFiles.size(); i++) {
+ Path expectedFile = allInProgressFiles.get(i);
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are
retried
+ Mockito.verify(fileTracker,
Mockito.times(expectedTimes)).markInProgress(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
- // Verify that processFile was called for each specific file (using prefix
matching)
- // Files 1 and 3 should be called twice (fail once, succeed on retry),
others once
- for (int i = 0; i < allInProgressFiles.size(); i++) {
- Path expectedFile = allInProgressFiles.get(i);
- String expectedPrefix = extractPrefix(expectedFile.getName());
- int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are
called twice (fail +
- // retry success)
- Mockito.verify(discovery, Mockito.times(expectedTimes))
- .processFile(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
- }
+ // Verify that processFile was called for each file in the directory
(i.e. 5 + 2 times for
+ // failed once that would succeed in next retry)
+ Mockito.verify(discovery,
Mockito.times(7)).processFile(Mockito.any(Path.class));
- // Verify that markCompleted was called for each successfully processed
file
- Mockito.verify(fileTracker,
Mockito.times(5)).markCompleted(Mockito.any(Path.class));
+ // Verify that processFile was called for each specific file (using
prefix matching)
+ // Files 1 and 3 should be called twice (fail once, succeed on retry),
others once
+ for (int i = 0; i < allInProgressFiles.size(); i++) {
+ Path expectedFile = allInProgressFiles.get(i);
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are
called twice (fail +
+ // retry success)
+ Mockito.verify(discovery, Mockito.times(expectedTimes)).processFile(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
- // Verify that markCompleted was called for 2 intermittent failed
processed file
- Mockito.verify(fileTracker,
Mockito.times(2)).markFailed(Mockito.any(Path.class));
+ // Verify that markCompleted was called for each successfully processed
file
+ Mockito.verify(fileTracker,
Mockito.times(5)).markCompleted(Mockito.any(Path.class));
- // Verify that markFailed was called once ONLY for failed files
- String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName());
- Mockito.verify(fileTracker, Mockito.times(1))
- .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(failedPrefix1)));
- String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName());
- Mockito.verify(fileTracker, Mockito.times(1))
- .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(failedPrefix3)));
+ // Verify that markCompleted was called for 2 intermittent failed
processed file
+ Mockito.verify(fileTracker,
Mockito.times(2)).markFailed(Mockito.any(Path.class));
- // Verify that markFailed was NOT called for files processed successfully
in first iteration
- String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName());
- Mockito.verify(fileTracker, Mockito.never())
- .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix0)));
- String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName());
- Mockito.verify(fileTracker, Mockito.never())
- .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix2)));
- String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName());
- Mockito.verify(fileTracker, Mockito.never())
- .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix4)));
+ // Verify that markFailed was called once ONLY for failed files
+ String failedPrefix1 =
extractPrefix(allInProgressFiles.get(1).getName());
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(failedPrefix1)));
+ String failedPrefix3 =
extractPrefix(allInProgressFiles.get(3).getName());
+ Mockito.verify(fileTracker, Mockito.times(1))
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(failedPrefix3)));
- // Verify that markCompleted was called for each successfully processed
file with correct paths
- for (Path expectedFile : allInProgressFiles) {
- String expectedPrefix = extractPrefix(expectedFile.getName());
- Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
- Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ // Verify that markFailed was NOT called for files processed
successfully in first iteration
+ String successPrefix0 =
extractPrefix(allInProgressFiles.get(0).getName());
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix0)));
+ String successPrefix2 =
extractPrefix(allInProgressFiles.get(2).getName());
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix2)));
+ String successPrefix4 =
extractPrefix(allInProgressFiles.get(4).getName());
+ Mockito.verify(fileTracker, Mockito.never())
+ .markFailed(Mockito.argThat(path ->
extractPrefix(path.getName()).equals(successPrefix4)));
+
+ // Verify that markCompleted was called for each successfully processed
file with correct
+ // paths
+ for (Path expectedFile : allInProgressFiles) {
+ String expectedPrefix = extractPrefix(expectedFile.getName());
+ Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
+ Mockito.argThat(path ->
extractPrefix(path.getName()).equals(expectedPrefix)));
+ }
+ } finally {
+ EnvironmentEdgeManager.reset();
}
}