This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 3045165e41 PHOENIX-7755 Add support for Consistency Point calculation
in Replication Log Replay on StandBy Cluster (#2356)
3045165e41 is described below
commit 3045165e41ff4a25548e09e12122fb8abf056be7
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Tue Feb 10 23:49:35 2026 +0530
PHOENIX-7755 Add support for Consistency Point calculation in Replication
Log Replay on StandBy Cluster (#2356)
---
...a => MetricsReplicationLogDiscoveryReplay.java} | 27 +-
.../MetricsReplicationLogDiscoveryReplayImpl.java | 13 +-
.../reader/ReplicationLogDiscoveryReplay.java | 84 +++++
.../reader/ReplicationLogReplayService.java | 27 +-
.../ReplicationLogDiscoveryReplayTestIT.java | 357 ++++++++++++++++++++-
.../reader/ReplicationLogReplayServiceTestIT.java | 245 ++++++++++++++
6 files changed, 736 insertions(+), 17 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java
similarity index 50%
copy from
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
copy to
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java
index c7dcf4f6ae..f78b762146 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java
@@ -17,18 +17,21 @@
*/
package org.apache.phoenix.replication.metrics;
-/** Implementation of metrics source for ReplicationLogDiscoveryReplay
operations. */
-public class MetricsReplicationLogDiscoveryReplayImpl extends
MetricsReplicationLogDiscoveryImpl {
+/**
+ * Interface for metrics specific to ReplicationLogDiscoveryReplay operations.
Extends the base
+ * MetricsReplicationLogDiscovery with replay-specific metrics.
+ */
+public interface MetricsReplicationLogDiscoveryReplay extends
MetricsReplicationLogDiscovery {
- private static final String METRICS_NAME = "ReplicationLogDiscoveryReplay";
- private static final String METRICS_DESCRIPTION =
- "Metrics about Replication Replay Log Discovery for an HA Group";
- private static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" +
METRICS_NAME;
+ String CONSISTENCY_POINT = "consistencyPoint";
+ String CONSISTENCY_POINT_DESC =
+ "Consistency point timestamp in milliseconds for the HA Group during
replay";
- public MetricsReplicationLogDiscoveryReplayImpl(final String haGroupName) {
- super(MetricsReplicationLogDiscoveryReplayImpl.METRICS_NAME,
- MetricsReplicationLogDiscoveryReplayImpl.METRICS_DESCRIPTION,
- MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT,
- MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT +
",haGroup=" + haGroupName);
- }
+ /**
+ * Updates the consistency point metric. The consistency point represents
the timestamp up to
+ * which all mutations have been replayed and the data is consistent for
failover or read
+ * operations.
+ * @param consistencyPointMs The consistency point timestamp in milliseconds
+ */
+ void updateConsistencyPoint(long consistencyPointMs);
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
index c7dcf4f6ae..16e65287d9 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
@@ -17,18 +17,29 @@
*/
package org.apache.phoenix.replication.metrics;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
/** Implementation of metrics source for ReplicationLogDiscoveryReplay
operations. */
-public class MetricsReplicationLogDiscoveryReplayImpl extends
MetricsReplicationLogDiscoveryImpl {
+public class MetricsReplicationLogDiscoveryReplayImpl extends
MetricsReplicationLogDiscoveryImpl
+ implements MetricsReplicationLogDiscoveryReplay {
private static final String METRICS_NAME = "ReplicationLogDiscoveryReplay";
private static final String METRICS_DESCRIPTION =
"Metrics about Replication Replay Log Discovery for an HA Group";
private static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" +
METRICS_NAME;
+ private final MutableGaugeLong consistencyPoint;
+
public MetricsReplicationLogDiscoveryReplayImpl(final String haGroupName) {
super(MetricsReplicationLogDiscoveryReplayImpl.METRICS_NAME,
MetricsReplicationLogDiscoveryReplayImpl.METRICS_DESCRIPTION,
MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT,
MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT +
",haGroup=" + haGroupName);
+ consistencyPoint = getMetricsRegistry().newGauge(CONSISTENCY_POINT,
CONSISTENCY_POINT_DESC, 0L);
+ }
+
+ @Override
+ public void updateConsistencyPoint(long consistencyPointMs) {
+ consistencyPoint.set(consistencyPointMs);
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index b6ce96e62c..361a93489c 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.replication.ReplicationLogTracker;
import org.apache.phoenix.replication.ReplicationRound;
import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
+import
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplay;
import
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -291,6 +292,18 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
public void replay() throws IOException {
LOG.info("Starting replay with lastRoundProcessed={}, lastRoundInSync={}",
lastRoundProcessed,
lastRoundInSync);
+
+ // Update consistency point metric at the start of replay
+ try {
+ long consistencyPoint = getConsistencyPoint();
+ LOG.debug("Consistency point for HAGroup: {} before starting the replay
is {}.", haGroupName,
+ consistencyPoint);
+ getReplayMetrics().updateConsistencyPoint(consistencyPoint);
+ } catch (IOException exception) {
+ LOG.warn("Failed to get the consistency point for HA Group: {} at start
of replay",
+ haGroupName, exception);
+ }
+
Optional<ReplicationRound> optionalNextRound = getFirstRoundToProcess();
LOG.info("Found first round to process as {} for haGroup: {}",
optionalNextRound, haGroupName);
while (optionalNextRound.isPresent()) {
@@ -341,6 +354,18 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
default:
throw new IllegalStateException("Unexpected state: " + currentState);
}
+
+ // Update consistency point metric after processing each round
+ try {
+ long consistencyPoint = getConsistencyPoint();
+ LOG.debug("Consistency point for HAGroup: {} after processing round:
{} is {}", haGroupName,
+ replicationRound, consistencyPoint);
+ getReplayMetrics().updateConsistencyPoint(consistencyPoint);
+ } catch (IOException exception) {
+ LOG.warn("Failed to get the consistency point for HA Group: {} after
processing round: {}",
+ haGroupName, replicationRound, exception);
+ }
+
optionalNextRound = getNextRoundToProcess();
}
@@ -384,6 +409,14 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
return new MetricsReplicationLogDiscoveryReplayImpl(haGroupName);
}
+ /**
+ * Returns the replay-specific metrics interface.
+ * @return MetricsReplicationLogDiscoveryReplay instance
+ */
+ protected MetricsReplicationLogDiscoveryReplay getReplayMetrics() {
+ return (MetricsReplicationLogDiscoveryReplay) getMetrics();
+ }
+
@Override
public String getExecutorThreadNameFormat() {
return EXECUTOR_THREAD_NAME_FORMAT;
@@ -520,4 +553,55 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
DEGRADED, // degraded for writer
SYNCED_RECOVERY // came back from degraded → standby, needs rewind
}
+
+ /**
+ * Returns the consistency point timestamp based on the current replication
replay state. The
+ * consistency point in a standby cluster is defined as the timestamp such
that all mutations
+ * whose timestamp less than this consistency point timestamp have been
replayed
+ * @return The consistency point timestamp in milliseconds
+ * @throws IOException if the consistency point cannot be determined based
on current state
+ */
+ public long getConsistencyPoint() throws IOException {
+
+ ReplicationReplayState currentState = replicationReplayState.get();
+ long consistencyPoint = 0L;
+
+ switch (currentState) {
+ case SYNC:
+ // In SYNC state: prefer minimum timestamp from in-progress files (if
any),
+ // otherwise use lastRoundInSync end time
+ Optional<Long> optionalMinTimestampInProgressTimestamp =
+ getMinTimestampFromInProgressFiles();
+ if (optionalMinTimestampInProgressTimestamp.isPresent()) {
+ // Use minimum timestamp from in-progress files as consistency point
+ consistencyPoint = optionalMinTimestampInProgressTimestamp.get();
+ } else if (lastRoundInSync != null) {
+ // Use lastRoundInSync end time if no in-progress files
+ // Since we are in sync mode, both lastRoundProcessed and
lastRoundInSync would be same.
+ // However, using lastRoundInSync to be on safe side.
+ consistencyPoint = lastRoundInSync.getEndTime();
+ } else {
+ throw new IOException(
+ "Not able to derive consistency point because In Progress
directory is empty and lastRoundInSync is not initialized.");
+ }
+ break;
+ case DEGRADED:
+ case SYNCED_RECOVERY:
+ // In DEGRADED or SYNCED_RECOVERY state: use lastRoundInSync end time
+ // (the last known sync point before degradation/recovery)
+ if (lastRoundInSync != null) {
+ consistencyPoint = lastRoundInSync.getEndTime();
+ } else {
+ throw new IOException(
+ "Not able to derive consistency point because lastRoundInSync is
not initialized.");
+ }
+ break;
+ default:
+ // Invalid or uninitialized state
+ throw new IOException(
+ "Not able to derive consistency point for current state: " +
currentState);
+ }
+
+ return consistencyPoint;
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index fe380dea85..24d40faac7 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -193,7 +194,7 @@ public class ReplicationLogReplayService {
List<String> replicationGroups = getReplicationGroups();
LOG.info("{} number of HA Groups found to start Replication Replay",
replicationGroups.size());
for (String replicationGroup : replicationGroups) {
- ReplicationLogReplay.get(conf, replicationGroup).startReplay();
+ getReplicationLogReplay(replicationGroup).startReplay();
}
}
@@ -204,14 +205,36 @@ public class ReplicationLogReplayService {
List<String> replicationGroups = getReplicationGroups();
LOG.info("{} number of HA Groups found to stop Replication Replay",
replicationGroups.size());
for (String replicationGroup : replicationGroups) {
- ReplicationLogReplay replicationLogReplay =
ReplicationLogReplay.get(conf, replicationGroup);
+ ReplicationLogReplay replicationLogReplay =
getReplicationLogReplay(replicationGroup);
replicationLogReplay.stopReplay();
replicationLogReplay.close();
}
}
+ /**
+ * Returns the minimum consistency point across all HA groups in the
cluster. See
+ * {@link ReplicationLogDiscoveryReplay#getConsistencyPoint()} for
definition of consistency point
+ * for a particular HA Group.
+ * @return The minimum consistency point timestamp in milliseconds across
all HA groups
+ * @throws IOException if there's an error retrieving consistency points
from replication groups
+ * @throws SQLException if there's an error accessing HA group information
+ */
+ protected long getConsistencyPoint() throws IOException, SQLException {
+ long consistencyPoint = EnvironmentEdgeManager.currentTime();
+ List<String> replicationGroups = getReplicationGroups();
+ for (String replicationGroup : replicationGroups) {
+ consistencyPoint = Math.min(getReplicationLogReplay(replicationGroup)
+ .getReplicationReplayLogDiscovery().getConsistencyPoint(),
consistencyPoint);
+ }
+ return consistencyPoint;
+ }
+
/** Returns the list of HA groups on the cluster */
protected List<String> getReplicationGroups() throws SQLException {
return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
}
+
+ protected ReplicationLogReplay getReplicationLogReplay(final String
haGroupName) {
+ return ReplicationLogReplay.get(conf, haGroupName);
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 27d62e71ea..9eac8c9cd3 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -22,11 +22,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URI;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -48,8 +51,7 @@ import org.apache.phoenix.replication.ReplicationLogGroup;
import org.apache.phoenix.replication.ReplicationLogTracker;
import org.apache.phoenix.replication.ReplicationRound;
import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker;
-import
org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl;
+import org.apache.phoenix.replication.metrics.*;
import org.apache.phoenix.util.HAGroupStoreTestUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
@@ -591,6 +593,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
try {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+ // Initialize the discovery
+ discovery.init();
+
ReplicationRound initialRound =
new ReplicationRound(initialEndTime - roundTimeMills,
initialEndTime);
discovery.setLastRoundProcessed(initialRound);
@@ -639,6 +645,11 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Verify triggerFailover was not called
assertEquals("triggerFailover should not be called", 0,
discovery.getTriggerFailoverCallCount());
+
+ // Verify consistency point: in SYNC state, should be
lastRoundInSync.getEndTime()
+ long consistencyPoint = discovery.getConsistencyPoint();
+ assertEquals("Consistency point should match
lastRoundInSync.getEndTime() in SYNC state",
+ expectedRound3.getEndTime(), consistencyPoint);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -678,6 +689,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
try {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+ // Initialize the discovery
+ discovery.init();
+
discovery.setLastRoundProcessed(
new ReplicationRound(initialEndTime - roundTimeMills,
initialEndTime));
ReplicationRound lastRoundInSyncBeforeReplay =
@@ -724,6 +739,12 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Verify triggerFailover was not called
assertEquals("triggerFailover should not be called", 0,
discovery.getTriggerFailoverCallCount());
+
+ // Verify consistency point: in DEGRADED state, should be
lastRoundInSync.getEndTime()
+ long consistencyPoint = discovery.getConsistencyPoint();
+ assertEquals(
+ "Consistency point should match lastRoundInSync.getEndTime() in
DEGRADED state",
+ lastRoundInSyncBeforeReplay.getEndTime(), consistencyPoint);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -764,6 +785,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+ // Initialize the discovery
+ discovery.init();
+
// Set initial state: lastRoundProcessed is ahead, lastRoundInSync is
behind
ReplicationRound lastInSyncRound =
new ReplicationRound(initialEndTime - roundTimeMills,
initialEndTime);
@@ -842,6 +866,13 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Verify triggerFailover was not called
assertEquals("triggerFailover should not be called", 0,
discovery.getTriggerFailoverCallCount());
+
+ // Verify consistency point: after transition to SYNC, should be
+ // lastRoundInSync.getEndTime()
+ long consistencyPoint = discovery.getConsistencyPoint();
+ assertEquals(
+ "Consistency point should match lastRoundInSync.getEndTime() after
SYNC transition",
+ expectedSixthRound.getEndTime(), consistencyPoint);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -881,6 +912,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
try {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+ // Initialize the discovery
+ discovery.init();
+
ReplicationRound initialRound =
new ReplicationRound(initialEndTime - roundTimeMills,
initialEndTime);
discovery.setLastRoundProcessed(initialRound);
@@ -947,6 +982,12 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Verify triggerFailover was not called
assertEquals("triggerFailover should not be called", 0,
discovery.getTriggerFailoverCallCount());
+
+ // Verify consistency point: in DEGRADED state, should be
lastRoundInSync.getEndTime()
+ long consistencyPoint = discovery.getConsistencyPoint();
+ assertEquals(
+ "Consistency point should match lastRoundInSync.getEndTime() in
DEGRADED state",
+ expectedRound1.getEndTime(), consistencyPoint);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -986,6 +1027,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+ // Initialize the discovery
+ discovery.init();
+
ReplicationRound lastInSyncRound =
new ReplicationRound(initialEndTime - roundTimeMills,
initialEndTime);
discovery.setLastRoundProcessed(new ReplicationRound(initialEndTime +
roundTimeMills,
@@ -1140,6 +1184,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
}
};
+ // Initialize the discovery
+ discoveryWithTransitions.init();
+
discoveryWithTransitions.setLastRoundProcessed(initialRound);
discoveryWithTransitions.setLastRoundInSync(initialRound);
discoveryWithTransitions
@@ -1217,6 +1264,13 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Verify triggerFailover was not called
assertEquals("triggerFailover should not be called", 0,
discoveryWithTransitions.getTriggerFailoverCallCount());
+
+ // Verify consistency point: after transition to SYNC, should be
+ // lastRoundInSync.getEndTime()
+ long consistencyPoint = discoveryWithTransitions.getConsistencyPoint();
+ assertEquals(
+ "Consistency point should match lastRoundInSync.getEndTime() after
SYNC transition",
+ expectedLastRound.getEndTime(), consistencyPoint);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -1254,6 +1308,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
try {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+ // Initialize the discovery
+ discovery.init();
+
ReplicationRound initialRound =
new ReplicationRound(initialEndTime - roundTimeMills,
initialEndTime);
discovery.setLastRoundProcessed(initialRound);
@@ -1288,6 +1346,11 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
// Verify triggerFailover was not called
assertEquals("triggerFailover should not be called", 0,
discovery.getTriggerFailoverCallCount());
+
+ // Verify consistency point: in SYNC state, should be
lastRoundInSync.getEndTime()
+ long consistencyPoint = discovery.getConsistencyPoint();
+ assertEquals("Consistency point should match
lastRoundInSync.getEndTime() in SYNC state",
+ lastRoundInSyncBeforeReplay.getEndTime(), consistencyPoint);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -1399,6 +1462,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+ // Initialize the discovery
+ discovery.init();
+
// Set lastRoundInSync with start time 0 (the new case being tested)
ReplicationRound lastRoundInSyncWithZeroStart = new
ReplicationRound(0L, roundTimeMills);
discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
@@ -1604,6 +1670,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
}
};
+ // Initialize the discovery
+ discovery.init();
+
// Set lastRoundInSync with start time 0
ReplicationRound lastRoundInSyncWithZeroStart = new
ReplicationRound(0L, roundTimeMills);
discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
@@ -1718,6 +1787,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
try {
TestableReplicationLogDiscoveryReplay discovery =
new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+ // Initialize the discovery
+ discovery.init();
+
ReplicationRound initialRound =
new ReplicationRound(initialEndTime - roundTimeMills,
initialEndTime);
discovery.setLastRoundProcessed(initialRound);
@@ -1776,6 +1849,11 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
assertFalse("failoverPending should be set to false after failover is
triggered",
discovery.getFailoverPending());
+ // Verify consistency point: in SYNC state, should be
lastRoundInSync.getEndTime()
+ long consistencyPoint = discovery.getConsistencyPoint();
+ assertEquals("Consistency point should match
lastRoundInSync.getEndTime() in SYNC state",
+ expectedRound3.getEndTime(), consistencyPoint);
+
// TODO: Ensure cluster state is updated to ACTIVE_IN_SYNC once
failover is triggered.
} finally {
EnvironmentEdgeManager.reset();
@@ -2041,6 +2119,115 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
}
}
+ /**
+ * Tests getConsistencyPoint method in SYNC state with in-progress files
present. Should return
+ * the minimum timestamp from in-progress files.
+ */
+ @Test
+ public void testGetConsistencyPointSyncStateWithInProgressFiles() throws
IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to SYNC
+
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+ // Create mock in-progress files
+ // Note: getFileTimestamp() parses the filename, so we don't need to mock
it
+ Path file1 = new Path("/test/1704153660000_rs2.plog");
+ Path file2 = new Path("/test/1704153600000_rs1.plog");
+ Path file3 = new Path("/test/1704153720000_rs3.plog");
+ List<Path> inProgressFiles = Arrays.asList(file1, file2, file3);
+
+ // Mock fileTracker to return in-progress files
+ doReturn(inProgressFiles).when(tracker).getInProgressFiles();
+
+ // Call getConsistencyPoint
+ long consistencyPoint = discovery.getConsistencyPoint();
+
+ // Should return the minimum timestamp from in-progress files
+ assertEquals("Should return minimum timestamp from in-progress files",
1704153600000L,
+ consistencyPoint);
+ }
+
+ /**
+ * Tests getConsistencyPoint method in SYNC state without in-progress files
but with
+ * lastRoundInSync set. Should return lastRoundInSync.getEndTime().
+ */
+ @Test
+ public void
testGetConsistencyPointSyncStateWithoutInProgressFilesWithLastRoundInSync()
+ throws IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to SYNC
+
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+ // Mock empty in-progress files
+ doReturn(Collections.emptyList()).when(tracker).getInProgressFiles();
+
+ // Set lastRoundInSync
+ long endTime = 1704153600000L;
+ long roundTimeMills = discovery.getRoundTimeMills();
+ ReplicationRound lastRoundInSync = new ReplicationRound(endTime -
roundTimeMills, endTime);
+ discovery.setLastRoundInSync(lastRoundInSync);
+
+ // Call getConsistencyPoint
+ long consistencyPoint = discovery.getConsistencyPoint();
+
+ // Should return lastRoundInSync.getEndTime()
+ assertEquals("Should return lastRoundInSync.getEndTime()", endTime,
consistencyPoint);
+ }
+
+ /**
+ * Tests getConsistencyPoint method in SYNC state without in-progress files
and without
+ * lastRoundInSync. Should throw IOException.
+ */
+ @Test
+ public void
testGetConsistencyPointSyncStateWithoutInProgressFilesWithoutLastRoundInSync()
+ throws IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to SYNC
+
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+ // Mock empty in-progress files
+ doReturn(Collections.emptyList()).when(tracker).getInProgressFiles();
+
+ // Don't set lastRoundInSync (should be null)
+
+ // Call getConsistencyPoint - should throw IOException
+ try {
+ discovery.getConsistencyPoint();
+ fail("Should throw IOException when in-progress files are empty and
lastRoundInSync is null");
+ } catch (IOException e) {
+ assertEquals("Error message should match",
+ "Not able to derive consistency point because In Progress directory is
empty and lastRoundInSync is not initialized.",
+ e.getMessage());
+ }
+ }
+
/**
* Tests triggerFailover when
* HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName)
throws
@@ -2111,6 +2298,168 @@ public class ReplicationLogDiscoveryReplayTestIT
extends BaseTest {
}
}
+ /**
+ * Tests getConsistencyPoint method in DEGRADED state with lastRoundInSync
set. Should return
+ * lastRoundInSync.getEndTime().
+ */
+ @Test
+ public void testGetConsistencyPointDegradedStateWithLastRoundInSync() throws
IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to DEGRADED
+ discovery
+
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+ // Set lastRoundInSync
+ long endTime = 1704153600000L;
+ long roundTimeMills = discovery.getRoundTimeMills();
+ ReplicationRound lastRoundInSync = new ReplicationRound(endTime -
roundTimeMills, endTime);
+ discovery.setLastRoundInSync(lastRoundInSync);
+
+ // Call getConsistencyPoint
+ long consistencyPoint = discovery.getConsistencyPoint();
+
+ // Should return lastRoundInSync.getEndTime()
+ assertEquals("Should return lastRoundInSync.getEndTime()", endTime,
consistencyPoint);
+ }
+
+ /**
+ * Tests getConsistencyPoint method in DEGRADED state without
lastRoundInSync. Should throw
+ * IOException.
+ */
+ @Test
+ public void testGetConsistencyPointDegradedStateWithoutLastRoundInSync()
throws IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to DEGRADED
+ discovery
+
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+ // Don't set lastRoundInSync (should be null)
+
+ // Call getConsistencyPoint - should throw IOException
+ try {
+ discovery.getConsistencyPoint();
+ fail("Should throw IOException when lastRoundInSync is null in DEGRADED
state");
+ } catch (IOException e) {
+ assertEquals("Error message should match",
+ "Not able to derive consistency point because lastRoundInSync is not
initialized.",
+ e.getMessage());
+ }
+ }
+
+ /**
+ * Tests getConsistencyPoint method in SYNCED_RECOVERY state with
lastRoundInSync set. Should
+ * return lastRoundInSync.getEndTime().
+ */
+ @Test
+ public void testGetConsistencyPointSyncedRecoveryStateWithLastRoundInSync()
throws IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to SYNCED_RECOVERY
+ discovery.setReplicationReplayState(
+ ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+
+ // Set lastRoundInSync
+ long endTime = 1704153600000L;
+ long roundTimeMills = discovery.getRoundTimeMills();
+ ReplicationRound lastRoundInSync = new ReplicationRound(endTime -
roundTimeMills, endTime);
+ discovery.setLastRoundInSync(lastRoundInSync);
+
+ // Call getConsistencyPoint
+ long consistencyPoint = discovery.getConsistencyPoint();
+
+ // Should return lastRoundInSync.getEndTime()
+ assertEquals("Should return lastRoundInSync.getEndTime()", endTime,
consistencyPoint);
+ }
+
+ /**
+ * Tests getConsistencyPoint method in SYNCED_RECOVERY state without
lastRoundInSync. Should throw
+ * IOException.
+ */
+ @Test
+ public void
testGetConsistencyPointSyncedRecoveryStateWithoutLastRoundInSync()
+ throws IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to SYNCED_RECOVERY
+ discovery.setReplicationReplayState(
+ ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+
+ // Don't set lastRoundInSync (should be null)
+
+ // Call getConsistencyPoint - should throw IOException
+ try {
+ discovery.getConsistencyPoint();
+ fail("Should throw IOException when lastRoundInSync is null in
SYNCED_RECOVERY state");
+ } catch (IOException e) {
+ assertEquals("Error message should match",
+ "Not able to derive consistency point because lastRoundInSync is not
initialized.",
+ e.getMessage());
+ }
+ }
+
+ /**
+ * Tests getConsistencyPoint method in NOT_INITIALIZED state. Should throw
IOException.
+ */
+ @Test
+ public void testGetConsistencyPointNotInitializedState() throws IOException {
+ // Create ReplicationLogTracker
+ ReplicationLogTracker tracker =
+ Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs,
standbyUri));
+ HAGroupStoreRecord haGroupStoreRecord =
+ new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
haGroupName,
+ HAGroupStoreRecord.HAGroupState.STANDBY, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ peerZkUrl, zkUrl, peerZkUrl, 0L);
+ TestableReplicationLogDiscoveryReplay discovery =
+ new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+ // Set state to NOT_INITIALIZED
+ discovery.setReplicationReplayState(
+ ReplicationLogDiscoveryReplay.ReplicationReplayState.NOT_INITIALIZED);
+
+ // Call getConsistencyPoint - should throw IOException
+ try {
+ discovery.getConsistencyPoint();
+ fail("Should throw IOException when state is NOT_INITIALIZED");
+ } catch (IOException e) {
+ assertEquals("Error message should match",
+ "Not able to derive consistency point for current state:
NOT_INITIALIZED", e.getMessage());
+ }
+ }
+
/**
* Testable implementation of ReplicationLogDiscoveryReplay for unit
testing. Provides dependency
* injection for HAGroupStoreRecord, tracks processed rounds, and supports
simulating state
@@ -2195,5 +2544,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends
BaseTest {
public List<ReplicationRound> getProcessedRounds() {
return new java.util.ArrayList<>(processedRounds);
}
+
+ public long getRoundTimeMills() {
+ return roundTimeMills;
+ }
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java
new file mode 100644
index 0000000000..97a6c49d2a
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HABaseIT;
+import org.apache.phoenix.jdbc.HAGroupStoreClient;
+import org.apache.phoenix.jdbc.PhoenixHAAdmin;
+import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogReplayServiceTestIT extends HABaseIT {
+
+ @ClassRule
+ public static TemporaryFolder testFolder = new TemporaryFolder();
+
+ private String zkUrl;
+ private String peerZkUrl;
+ private FileSystem localFs;
+ private URI standbyUri;
+ private PhoenixHAAdmin haAdmin;
+ private PhoenixHAAdmin peerHaAdmin;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ CLUSTERS.start();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ zkUrl = getLocalZkUrl(conf1);
+ peerZkUrl = CLUSTERS.getZkUrl2();
+ localFs = FileSystem.getLocal(conf1);
+ standbyUri = testFolder.getRoot().toURI();
+ haAdmin = new PhoenixHAAdmin(zkUrl, conf1,
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
+ peerHaAdmin = new PhoenixHAAdmin(peerZkUrl, conf2,
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
+ cleanupHAGroupState();
+
+ // Set the required configuration for ReplicationLogReplay
+ conf1.set(ReplicationLogReplay.REPLICATION_LOG_REPLAY_HDFS_URL_KEY,
standbyUri.toString());
+ // Enable replication replay service
+
conf1.setBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
true);
+
+ }
+
+ /**
+ * Tests getConsistencyPoint method of ReplicationLogReplayService with
multiple HA groups.
+ * Verifies that it returns the minimum consistency point across all HA
groups.
+ */
+ @Test
+ public void testGetConsistencyPointMultipleGroups() throws IOException,
SQLException {
+ final String haGroupName1 = testName.getMethodName() + "_1";
+ final String haGroupName2 = testName.getMethodName() + "_2";
+
+ // Insert HAGroupStoreRecords into the system table
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName1, zkUrl,
peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY, null);
+
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName2, zkUrl,
peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY, null);
+
+ // Set up consistency points for both groups
+ long consistencyPoint1 = 1704153600000L; // 2024-01-02 00:00:00
+ long consistencyPoint2 = 1704240000000L; // 2024-01-03 00:00:00
+
+ // Create testable replays with mocked consistency points
+ TestableReplicationLogReplay testableReplay1 =
+ new TestableReplicationLogReplay(conf1, haGroupName1);
+ testableReplay1.setConsistencyPoint(consistencyPoint1);
+
+ TestableReplicationLogReplay testableReplay2 =
+ new TestableReplicationLogReplay(conf1, haGroupName2);
+ testableReplay2.setConsistencyPoint(consistencyPoint2);
+
+ // Create a spy on ReplicationLogReplayService and mock
getReplicationLogReplay
+ ReplicationLogReplayService service =
ReplicationLogReplayService.getInstance(conf1);
+ ReplicationLogReplayService serviceSpy = Mockito.spy(service);
+
+
Mockito.doReturn(testableReplay1).when(serviceSpy).getReplicationLogReplay(haGroupName1);
+
Mockito.doReturn(testableReplay2).when(serviceSpy).getReplicationLogReplay(haGroupName2);
+
+ // Call getConsistencyPoint
+ long consistencyPoint = serviceSpy.getConsistencyPoint();
+
+ // Should return the minimum consistency point across all groups
+ // Since consistencyPoint1 < consistencyPoint2, consistencyPoint should be
consistencyPoint1
+ assertEquals("Consistency point should be the minimum across all HA
groups", consistencyPoint1,
+ consistencyPoint);
+ }
+
+ /**
+ * Tests getConsistencyPoint method of ReplicationLogReplayService with a
single HA group.
+ * Verifies that it returns the consistency point for that group.
+ */
+ @Test
+ public void testGetConsistencyPointSingleGroup() throws IOException,
SQLException {
+ final String haGroupName = testName.getMethodName();
+
+ // Insert HAGroupStoreRecord into the system table
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl,
peerZkUrl,
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY, null);
+
+ // Set up consistency point for the group
+ long consistencyPoint = 1704153600000L; // 2024-01-02 00:00:00
+
+ // Create testable replay with mocked consistency point
+ TestableReplicationLogReplay testableReplay =
+ new TestableReplicationLogReplay(conf1, haGroupName);
+ testableReplay.setConsistencyPoint(consistencyPoint);
+
+ // Create a spy on ReplicationLogReplayService and mock
getReplicationLogReplay
+ ReplicationLogReplayService service =
ReplicationLogReplayService.getInstance(conf1);
+ ReplicationLogReplayService serviceSpy = Mockito.spy(service);
+
+
Mockito.doReturn(testableReplay).when(serviceSpy).getReplicationLogReplay(haGroupName);
+
+ // Call getConsistencyPoint
+ long result = serviceSpy.getConsistencyPoint();
+
+ // Should return the consistency point for the single group
+ // Since we start with currentTime and compare with Math.min, it should
return the group's
+ // consistency point
+ assertEquals("Consistency point should match the single HA group's
consistency point",
+ consistencyPoint, result);
+ }
+
+ /**
+ * Tests getConsistencyPoint method when no HA groups exist. Should return
currentTime (initial
+ * value) when there are no groups.
+ */
+ @Test
+ public void testGetConsistencyPointWithNoGroups() throws IOException,
SQLException {
+ // Ensure no HA groups exist
+ cleanupHAGroupState();
+
+ // Mock current time
+ long mockedCurrentTime = 1704153600000L; // 2024-01-02 00:00:00
+ EnvironmentEdge edge = () -> mockedCurrentTime;
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ try {
+ // Get ReplicationLogReplayService instance
+ ReplicationLogReplayService service =
ReplicationLogReplayService.getInstance(conf1);
+
+ // Call getConsistencyPoint
+ long consistencyPoint = service.getConsistencyPoint();
+
+ // Should return mocked currentTime when no groups exist (since it's
initialized with
+ // currentTime)
+ assertEquals("Consistency point should equal mocked currentTime when no
HA groups exist",
+ mockedCurrentTime, consistencyPoint);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ private void cleanupHAGroupState() throws SQLException {
+ // Clean up existing HAGroupStoreRecords
+ try {
+ List<String> haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl);
+ for (String haGroupName : haGroupNames) {
+ haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+ }
+
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ // Remove any existing entries in the system table
+ HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl);
+ }
+
+ /**
+ * Testable implementation of ReplicationLogReplay for testing. Allows
mocking the
+ * getConsistencyPoint method via getReplicationReplayLogDiscovery().
+ */
+ private static class TestableReplicationLogReplay extends
ReplicationLogReplay {
+ private ReplicationLogDiscoveryReplay mockDiscovery;
+
+ public TestableReplicationLogReplay(org.apache.hadoop.conf.Configuration
conf,
+ String haGroupName) {
+ super(conf, haGroupName);
+ // Create a mock discovery that we can configure
+ mockDiscovery = Mockito.mock(ReplicationLogDiscoveryReplay.class);
+ }
+
+ public void setConsistencyPoint(long consistencyPoint) {
+ try {
+ when(mockDiscovery.getConsistencyPoint()).thenReturn(consistencyPoint);
+ } catch (IOException e) {
+ // This shouldn't happen during mock setup, but handle it just in case
+ throw new RuntimeException("Failed to set consistency point", e);
+ }
+ }
+
+ @Override
+ protected ReplicationLogDiscoveryReplay getReplicationReplayLogDiscovery()
{
+ return mockDiscovery;
+ }
+ }
+}