This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 3045165e41 PHOENIX-7755 Add support for Consistency Point calculation 
in Replication Log Replay on StandBy Cluster (#2356)
3045165e41 is described below

commit 3045165e41ff4a25548e09e12122fb8abf056be7
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Tue Feb 10 23:49:35 2026 +0530

    PHOENIX-7755 Add support for Consistency Point calculation in Replication 
Log Replay on StandBy Cluster (#2356)
---
 ...a => MetricsReplicationLogDiscoveryReplay.java} |  27 +-
 .../MetricsReplicationLogDiscoveryReplayImpl.java  |  13 +-
 .../reader/ReplicationLogDiscoveryReplay.java      |  84 +++++
 .../reader/ReplicationLogReplayService.java        |  27 +-
 .../ReplicationLogDiscoveryReplayTestIT.java       | 357 ++++++++++++++++++++-
 .../reader/ReplicationLogReplayServiceTestIT.java  | 245 ++++++++++++++
 6 files changed, 736 insertions(+), 17 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java
similarity index 50%
copy from 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
copy to 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java
index c7dcf4f6ae..f78b762146 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplay.java
@@ -17,18 +17,21 @@
  */
 package org.apache.phoenix.replication.metrics;
 
-/** Implementation of metrics source for ReplicationLogDiscoveryReplay 
operations. */
-public class MetricsReplicationLogDiscoveryReplayImpl extends 
MetricsReplicationLogDiscoveryImpl {
+/**
+ * Interface for metrics specific to ReplicationLogDiscoveryReplay operations. 
Extends the base
+ * MetricsReplicationLogDiscovery with replay-specific metrics.
+ */
+public interface MetricsReplicationLogDiscoveryReplay extends 
MetricsReplicationLogDiscovery {
 
-  private static final String METRICS_NAME = "ReplicationLogDiscoveryReplay";
-  private static final String METRICS_DESCRIPTION =
-    "Metrics about Replication Replay Log Discovery for an HA Group";
-  private static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + 
METRICS_NAME;
+  String CONSISTENCY_POINT = "consistencyPoint";
+  String CONSISTENCY_POINT_DESC =
+    "Consistency point timestamp in milliseconds for the HA Group during 
replay";
 
-  public MetricsReplicationLogDiscoveryReplayImpl(final String haGroupName) {
-    super(MetricsReplicationLogDiscoveryReplayImpl.METRICS_NAME,
-      MetricsReplicationLogDiscoveryReplayImpl.METRICS_DESCRIPTION,
-      MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT,
-      MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT + 
",haGroup=" + haGroupName);
-  }
+  /**
+   * Updates the consistency point metric. The consistency point represents 
the timestamp up to
+   * which all mutations have been replayed and the data is consistent for 
failover or read
+   * operations.
+   * @param consistencyPointMs The consistency point timestamp in milliseconds
+   */
+  void updateConsistencyPoint(long consistencyPointMs);
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
index c7dcf4f6ae..16e65287d9 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogDiscoveryReplayImpl.java
@@ -17,18 +17,29 @@
  */
 package org.apache.phoenix.replication.metrics;
 
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
 /** Implementation of metrics source for ReplicationLogDiscoveryReplay 
operations. */
-public class MetricsReplicationLogDiscoveryReplayImpl extends 
MetricsReplicationLogDiscoveryImpl {
+public class MetricsReplicationLogDiscoveryReplayImpl extends 
MetricsReplicationLogDiscoveryImpl
+  implements MetricsReplicationLogDiscoveryReplay {
 
   private static final String METRICS_NAME = "ReplicationLogDiscoveryReplay";
   private static final String METRICS_DESCRIPTION =
     "Metrics about Replication Replay Log Discovery for an HA Group";
   private static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + 
METRICS_NAME;
 
+  private final MutableGaugeLong consistencyPoint;
+
   public MetricsReplicationLogDiscoveryReplayImpl(final String haGroupName) {
     super(MetricsReplicationLogDiscoveryReplayImpl.METRICS_NAME,
       MetricsReplicationLogDiscoveryReplayImpl.METRICS_DESCRIPTION,
       MetricsReplicationLogDiscoveryImpl.METRICS_CONTEXT,
       MetricsReplicationLogDiscoveryReplayImpl.METRICS_JMX_CONTEXT + 
",haGroup=" + haGroupName);
+    consistencyPoint = getMetricsRegistry().newGauge(CONSISTENCY_POINT, 
CONSISTENCY_POINT_DESC, 0L);
+  }
+
+  @Override
+  public void updateConsistencyPoint(long consistencyPointMs) {
+    consistencyPoint.set(consistencyPointMs);
   }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index b6ce96e62c..361a93489c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.replication.ReplicationLogTracker;
 import org.apache.phoenix.replication.ReplicationRound;
 import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
 import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
+import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplay;
 import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -291,6 +292,18 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
   public void replay() throws IOException {
     LOG.info("Starting replay with lastRoundProcessed={}, lastRoundInSync={}", 
lastRoundProcessed,
       lastRoundInSync);
+
+    // Update consistency point metric at the start of replay
+    try {
+      long consistencyPoint = getConsistencyPoint();
+      LOG.debug("Consistency point for HAGroup: {} before starting the replay 
is {}.", haGroupName,
+        consistencyPoint);
+      getReplayMetrics().updateConsistencyPoint(consistencyPoint);
+    } catch (IOException exception) {
+      LOG.warn("Failed to get the consistency point for HA Group: {} at start 
of replay",
+        haGroupName, exception);
+    }
+
     Optional<ReplicationRound> optionalNextRound = getFirstRoundToProcess();
     LOG.info("Found first round to process as {} for haGroup: {}", 
optionalNextRound, haGroupName);
     while (optionalNextRound.isPresent()) {
@@ -341,6 +354,18 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
         default:
           throw new IllegalStateException("Unexpected state: " + currentState);
       }
+
+      // Update consistency point metric after processing each round
+      try {
+        long consistencyPoint = getConsistencyPoint();
+        LOG.debug("Consistency point for HAGroup: {} after processing round: 
{} is {}", haGroupName,
+          replicationRound, consistencyPoint);
+        getReplayMetrics().updateConsistencyPoint(consistencyPoint);
+      } catch (IOException exception) {
+        LOG.warn("Failed to get the consistency point for HA Group: {} after 
processing round: {}",
+          haGroupName, replicationRound, exception);
+      }
+
       optionalNextRound = getNextRoundToProcess();
     }
 
@@ -384,6 +409,14 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
     return new MetricsReplicationLogDiscoveryReplayImpl(haGroupName);
   }
 
+  /**
+   * Returns the replay-specific metrics interface.
+   * @return MetricsReplicationLogDiscoveryReplay instance
+   */
+  protected MetricsReplicationLogDiscoveryReplay getReplayMetrics() {
+    return (MetricsReplicationLogDiscoveryReplay) getMetrics();
+  }
+
   @Override
   public String getExecutorThreadNameFormat() {
     return EXECUTOR_THREAD_NAME_FORMAT;
@@ -520,4 +553,55 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
     DEGRADED, // degraded for writer
     SYNCED_RECOVERY // came back from degraded → standby, needs rewind
   }
+
+  /**
+   * Returns the consistency point timestamp based on the current replication 
replay state. The
+   * consistency point in a standby cluster is defined as the timestamp such 
that all mutations
+   * whose timestamp less than this consistency point timestamp have been 
replayed
+   * @return The consistency point timestamp in milliseconds
+   * @throws IOException if the consistency point cannot be determined based 
on current state
+   */
+  public long getConsistencyPoint() throws IOException {
+
+    ReplicationReplayState currentState = replicationReplayState.get();
+    long consistencyPoint = 0L;
+
+    switch (currentState) {
+      case SYNC:
+        // In SYNC state: prefer minimum timestamp from in-progress files (if 
any),
+        // otherwise use lastRoundInSync end time
+        Optional<Long> optionalMinTimestampInProgressTimestamp =
+          getMinTimestampFromInProgressFiles();
+        if (optionalMinTimestampInProgressTimestamp.isPresent()) {
+          // Use minimum timestamp from in-progress files as consistency point
+          consistencyPoint = optionalMinTimestampInProgressTimestamp.get();
+        } else if (lastRoundInSync != null) {
+          // Use lastRoundInSync end time if no in-progress files
+          // Since we are in sync mode, both lastRoundProcessed and 
lastRoundInSync would be same.
+          // However, using lastRoundInSync to be on safe side.
+          consistencyPoint = lastRoundInSync.getEndTime();
+        } else {
+          throw new IOException(
+            "Not able to derive consistency point because In Progress 
directory is empty and lastRoundInSync is not initialized.");
+        }
+        break;
+      case DEGRADED:
+      case SYNCED_RECOVERY:
+        // In DEGRADED or SYNCED_RECOVERY state: use lastRoundInSync end time
+        // (the last known sync point before degradation/recovery)
+        if (lastRoundInSync != null) {
+          consistencyPoint = lastRoundInSync.getEndTime();
+        } else {
+          throw new IOException(
+            "Not able to derive consistency point because lastRoundInSync is 
not initialized.");
+        }
+        break;
+      default:
+        // Invalid or uninitialized state
+        throw new IOException(
+          "Not able to derive consistency point for current state: " + 
currentState);
+    }
+
+    return consistencyPoint;
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index fe380dea85..24d40faac7 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -193,7 +194,7 @@ public class ReplicationLogReplayService {
     List<String> replicationGroups = getReplicationGroups();
     LOG.info("{} number of HA Groups found to start Replication Replay", 
replicationGroups.size());
     for (String replicationGroup : replicationGroups) {
-      ReplicationLogReplay.get(conf, replicationGroup).startReplay();
+      getReplicationLogReplay(replicationGroup).startReplay();
     }
   }
 
@@ -204,14 +205,36 @@ public class ReplicationLogReplayService {
     List<String> replicationGroups = getReplicationGroups();
     LOG.info("{} number of HA Groups found to stop Replication Replay", 
replicationGroups.size());
     for (String replicationGroup : replicationGroups) {
-      ReplicationLogReplay replicationLogReplay = 
ReplicationLogReplay.get(conf, replicationGroup);
+      ReplicationLogReplay replicationLogReplay = 
getReplicationLogReplay(replicationGroup);
       replicationLogReplay.stopReplay();
       replicationLogReplay.close();
     }
   }
 
+  /**
+   * Returns the minimum consistency point across all HA groups in the 
cluster. See
+   * {@link ReplicationLogDiscoveryReplay#getConsistencyPoint()} for 
definition of consistency point
+   * for a particular HA Group.
+   * @return The minimum consistency point timestamp in milliseconds across 
all HA groups
+   * @throws IOException  if there's an error retrieving consistency points 
from replication groups
+   * @throws SQLException if there's an error accessing HA group information
+   */
+  protected long getConsistencyPoint() throws IOException, SQLException {
+    long consistencyPoint = EnvironmentEdgeManager.currentTime();
+    List<String> replicationGroups = getReplicationGroups();
+    for (String replicationGroup : replicationGroups) {
+      consistencyPoint = Math.min(getReplicationLogReplay(replicationGroup)
+        .getReplicationReplayLogDiscovery().getConsistencyPoint(), 
consistencyPoint);
+    }
+    return consistencyPoint;
+  }
+
   /** Returns the list of HA groups on the cluster */
   protected List<String> getReplicationGroups() throws SQLException {
     return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
   }
+
+  protected ReplicationLogReplay getReplicationLogReplay(final String 
haGroupName) {
+    return ReplicationLogReplay.get(conf, haGroupName);
+  }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
index 27d62e71ea..9eac8c9cd3 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java
@@ -22,11 +22,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.URI;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -48,8 +51,7 @@ import org.apache.phoenix.replication.ReplicationLogGroup;
 import org.apache.phoenix.replication.ReplicationLogTracker;
 import org.apache.phoenix.replication.ReplicationRound;
 import org.apache.phoenix.replication.ReplicationShardDirectoryManager;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker;
-import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl;
+import org.apache.phoenix.replication.metrics.*;
 import org.apache.phoenix.util.HAGroupStoreTestUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
@@ -591,6 +593,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       try {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+        // Initialize the discovery
+        discovery.init();
+
         ReplicationRound initialRound =
           new ReplicationRound(initialEndTime - roundTimeMills, 
initialEndTime);
         discovery.setLastRoundProcessed(initialRound);
@@ -639,6 +645,11 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         // Verify triggerFailover was not called
         assertEquals("triggerFailover should not be called", 0,
           discovery.getTriggerFailoverCallCount());
+
+        // Verify consistency point: in SYNC state, should be 
lastRoundInSync.getEndTime()
+        long consistencyPoint = discovery.getConsistencyPoint();
+        assertEquals("Consistency point should match 
lastRoundInSync.getEndTime() in SYNC state",
+          expectedRound3.getEndTime(), consistencyPoint);
       } finally {
         EnvironmentEdgeManager.reset();
       }
@@ -678,6 +689,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       try {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+        // Initialize the discovery
+        discovery.init();
+
         discovery.setLastRoundProcessed(
           new ReplicationRound(initialEndTime - roundTimeMills, 
initialEndTime));
         ReplicationRound lastRoundInSyncBeforeReplay =
@@ -724,6 +739,12 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         // Verify triggerFailover was not called
         assertEquals("triggerFailover should not be called", 0,
           discovery.getTriggerFailoverCallCount());
+
+        // Verify consistency point: in DEGRADED state, should be 
lastRoundInSync.getEndTime()
+        long consistencyPoint = discovery.getConsistencyPoint();
+        assertEquals(
+          "Consistency point should match lastRoundInSync.getEndTime() in 
DEGRADED state",
+          lastRoundInSyncBeforeReplay.getEndTime(), consistencyPoint);
       } finally {
         EnvironmentEdgeManager.reset();
       }
@@ -764,6 +785,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
 
+        // Initialize the discovery
+        discovery.init();
+
         // Set initial state: lastRoundProcessed is ahead, lastRoundInSync is 
behind
         ReplicationRound lastInSyncRound =
           new ReplicationRound(initialEndTime - roundTimeMills, 
initialEndTime);
@@ -842,6 +866,13 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         // Verify triggerFailover was not called
         assertEquals("triggerFailover should not be called", 0,
           discovery.getTriggerFailoverCallCount());
+
+        // Verify consistency point: after transition to SYNC, should be
+        // lastRoundInSync.getEndTime()
+        long consistencyPoint = discovery.getConsistencyPoint();
+        assertEquals(
+          "Consistency point should match lastRoundInSync.getEndTime() after 
SYNC transition",
+          expectedSixthRound.getEndTime(), consistencyPoint);
       } finally {
         EnvironmentEdgeManager.reset();
       }
@@ -881,6 +912,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       try {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+        // Initialize the discovery
+        discovery.init();
+
         ReplicationRound initialRound =
           new ReplicationRound(initialEndTime - roundTimeMills, 
initialEndTime);
         discovery.setLastRoundProcessed(initialRound);
@@ -947,6 +982,12 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         // Verify triggerFailover was not called
         assertEquals("triggerFailover should not be called", 0,
           discovery.getTriggerFailoverCallCount());
+
+        // Verify consistency point: in DEGRADED state, should be 
lastRoundInSync.getEndTime()
+        long consistencyPoint = discovery.getConsistencyPoint();
+        assertEquals(
+          "Consistency point should match lastRoundInSync.getEndTime() in 
DEGRADED state",
+          expectedRound1.getEndTime(), consistencyPoint);
       } finally {
         EnvironmentEdgeManager.reset();
       }
@@ -986,6 +1027,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
 
+        // Initialize the discovery
+        discovery.init();
+
         ReplicationRound lastInSyncRound =
           new ReplicationRound(initialEndTime - roundTimeMills, 
initialEndTime);
         discovery.setLastRoundProcessed(new ReplicationRound(initialEndTime + 
roundTimeMills,
@@ -1140,6 +1184,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
             }
           };
 
+        // Initialize the discovery
+        discoveryWithTransitions.init();
+
         discoveryWithTransitions.setLastRoundProcessed(initialRound);
         discoveryWithTransitions.setLastRoundInSync(initialRound);
         discoveryWithTransitions
@@ -1217,6 +1264,13 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         // Verify triggerFailover was not called
         assertEquals("triggerFailover should not be called", 0,
           discoveryWithTransitions.getTriggerFailoverCallCount());
+
+        // Verify consistency point: after transition to SYNC, should be
+        // lastRoundInSync.getEndTime()
+        long consistencyPoint = discoveryWithTransitions.getConsistencyPoint();
+        assertEquals(
+          "Consistency point should match lastRoundInSync.getEndTime() after 
SYNC transition",
+          expectedLastRound.getEndTime(), consistencyPoint);
       } finally {
         EnvironmentEdgeManager.reset();
       }
@@ -1254,6 +1308,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       try {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+        // Initialize the discovery
+        discovery.init();
+
         ReplicationRound initialRound =
           new ReplicationRound(initialEndTime - roundTimeMills, 
initialEndTime);
         discovery.setLastRoundProcessed(initialRound);
@@ -1288,6 +1346,11 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         // Verify triggerFailover was not called
         assertEquals("triggerFailover should not be called", 0,
           discovery.getTriggerFailoverCallCount());
+
+        // Verify consistency point: in SYNC state, should be 
lastRoundInSync.getEndTime()
+        long consistencyPoint = discovery.getConsistencyPoint();
+        assertEquals("Consistency point should match 
lastRoundInSync.getEndTime() in SYNC state",
+          lastRoundInSyncBeforeReplay.getEndTime(), consistencyPoint);
       } finally {
         EnvironmentEdgeManager.reset();
       }
@@ -1399,6 +1462,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
 
+        // Initialize the discovery
+        discovery.init();
+
         // Set lastRoundInSync with start time 0 (the new case being tested)
         ReplicationRound lastRoundInSyncWithZeroStart = new 
ReplicationRound(0L, roundTimeMills);
         discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
@@ -1604,6 +1670,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
             }
           };
 
+        // Initialize the discovery
+        discovery.init();
+
         // Set lastRoundInSync with start time 0
         ReplicationRound lastRoundInSyncWithZeroStart = new 
ReplicationRound(0L, roundTimeMills);
         discovery.setLastRoundInSync(lastRoundInSyncWithZeroStart);
@@ -1718,6 +1787,10 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
       try {
         TestableReplicationLogDiscoveryReplay discovery =
           new TestableReplicationLogDiscoveryReplay(fileTracker, mockRecord);
+
+        // Initialize the discovery
+        discovery.init();
+
         ReplicationRound initialRound =
           new ReplicationRound(initialEndTime - roundTimeMills, 
initialEndTime);
         discovery.setLastRoundProcessed(initialRound);
@@ -1776,6 +1849,11 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
         assertFalse("failoverPending should be set to false after failover is 
triggered",
           discovery.getFailoverPending());
 
+        // Verify consistency point: in SYNC state, should be 
lastRoundInSync.getEndTime()
+        long consistencyPoint = discovery.getConsistencyPoint();
+        assertEquals("Consistency point should match 
lastRoundInSync.getEndTime() in SYNC state",
+          expectedRound3.getEndTime(), consistencyPoint);
+
         // TODO: Ensure cluster state is updated to ACTIVE_IN_SYNC once 
failover is triggered.
       } finally {
         EnvironmentEdgeManager.reset();
@@ -2041,6 +2119,115 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
     }
   }
 
+  /**
+   * Tests getConsistencyPoint method in SYNC state with in-progress files 
present. Should return
+   * the minimum timestamp from in-progress files.
+   */
+  @Test
+  public void testGetConsistencyPointSyncStateWithInProgressFiles() throws 
IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to SYNC
+    
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+    // Create mock in-progress files
+    // Note: getFileTimestamp() parses the filename, so we don't need to mock 
it
+    Path file1 = new Path("/test/1704153660000_rs2.plog");
+    Path file2 = new Path("/test/1704153600000_rs1.plog");
+    Path file3 = new Path("/test/1704153720000_rs3.plog");
+    List<Path> inProgressFiles = Arrays.asList(file1, file2, file3);
+
+    // Mock fileTracker to return in-progress files
+    doReturn(inProgressFiles).when(tracker).getInProgressFiles();
+
+    // Call getConsistencyPoint
+    long consistencyPoint = discovery.getConsistencyPoint();
+
+    // Should return the minimum timestamp from in-progress files
+    assertEquals("Should return minimum timestamp from in-progress files", 
1704153600000L,
+      consistencyPoint);
+  }
+
+  /**
+   * Tests getConsistencyPoint method in SYNC state without in-progress files 
but with
+   * lastRoundInSync set. Should return lastRoundInSync.getEndTime().
+   */
+  @Test
+  public void 
testGetConsistencyPointSyncStateWithoutInProgressFilesWithLastRoundInSync()
+    throws IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to SYNC
+    
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+    // Mock empty in-progress files
+    doReturn(Collections.emptyList()).when(tracker).getInProgressFiles();
+
+    // Set lastRoundInSync
+    long endTime = 1704153600000L;
+    long roundTimeMills = discovery.getRoundTimeMills();
+    ReplicationRound lastRoundInSync = new ReplicationRound(endTime - 
roundTimeMills, endTime);
+    discovery.setLastRoundInSync(lastRoundInSync);
+
+    // Call getConsistencyPoint
+    long consistencyPoint = discovery.getConsistencyPoint();
+
+    // Should return lastRoundInSync.getEndTime()
+    assertEquals("Should return lastRoundInSync.getEndTime()", endTime, 
consistencyPoint);
+  }
+
+  /**
+   * Tests getConsistencyPoint method in SYNC state without in-progress files 
and without
+   * lastRoundInSync. Should throw IOException.
+   */
+  @Test
+  public void 
testGetConsistencyPointSyncStateWithoutInProgressFilesWithoutLastRoundInSync()
+    throws IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to SYNC
+    
discovery.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNC);
+
+    // Mock empty in-progress files
+    doReturn(Collections.emptyList()).when(tracker).getInProgressFiles();
+
+    // Don't set lastRoundInSync (should be null)
+
+    // Call getConsistencyPoint - should throw IOException
+    try {
+      discovery.getConsistencyPoint();
+      fail("Should throw IOException when in-progress files are empty and 
lastRoundInSync is null");
+    } catch (IOException e) {
+      assertEquals("Error message should match",
+        "Not able to derive consistency point because In Progress directory is 
empty and lastRoundInSync is not initialized.",
+        e.getMessage());
+    }
+  }
+
   /**
    * Tests triggerFailover when
    * HAGroupStoreManager.getInstance(conf).setHAGroupStatusToSync(haGroupName) 
throws
@@ -2111,6 +2298,168 @@ public class ReplicationLogDiscoveryReplayTestIT 
extends BaseTest {
     }
   }
 
+  /**
+   * Tests getConsistencyPoint method in DEGRADED state with lastRoundInSync 
set. Should return
+   * lastRoundInSync.getEndTime().
+   */
+  @Test
+  public void testGetConsistencyPointDegradedStateWithLastRoundInSync() throws 
IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to DEGRADED
+    discovery
+      
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+    // Set lastRoundInSync
+    long endTime = 1704153600000L;
+    long roundTimeMills = discovery.getRoundTimeMills();
+    ReplicationRound lastRoundInSync = new ReplicationRound(endTime - 
roundTimeMills, endTime);
+    discovery.setLastRoundInSync(lastRoundInSync);
+
+    // Call getConsistencyPoint
+    long consistencyPoint = discovery.getConsistencyPoint();
+
+    // Should return lastRoundInSync.getEndTime()
+    assertEquals("Should return lastRoundInSync.getEndTime()", endTime, 
consistencyPoint);
+  }
+
+  /**
+   * Tests getConsistencyPoint method in DEGRADED state without 
lastRoundInSync. Should throw
+   * IOException.
+   */
+  @Test
+  public void testGetConsistencyPointDegradedStateWithoutLastRoundInSync() 
throws IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to DEGRADED
+    discovery
+      
.setReplicationReplayState(ReplicationLogDiscoveryReplay.ReplicationReplayState.DEGRADED);
+
+    // Don't set lastRoundInSync (should be null)
+
+    // Call getConsistencyPoint - should throw IOException
+    try {
+      discovery.getConsistencyPoint();
+      fail("Should throw IOException when lastRoundInSync is null in DEGRADED 
state");
+    } catch (IOException e) {
+      assertEquals("Error message should match",
+        "Not able to derive consistency point because lastRoundInSync is not 
initialized.",
+        e.getMessage());
+    }
+  }
+
+  /**
+   * Tests getConsistencyPoint method in SYNCED_RECOVERY state with 
lastRoundInSync set. Should
+   * return lastRoundInSync.getEndTime().
+   */
+  @Test
+  public void testGetConsistencyPointSyncedRecoveryStateWithLastRoundInSync() 
throws IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to SYNCED_RECOVERY
+    discovery.setReplicationReplayState(
+      ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+
+    // Set lastRoundInSync
+    long endTime = 1704153600000L;
+    long roundTimeMills = discovery.getRoundTimeMills();
+    ReplicationRound lastRoundInSync = new ReplicationRound(endTime - 
roundTimeMills, endTime);
+    discovery.setLastRoundInSync(lastRoundInSync);
+
+    // Call getConsistencyPoint
+    long consistencyPoint = discovery.getConsistencyPoint();
+
+    // Should return lastRoundInSync.getEndTime()
+    assertEquals("Should return lastRoundInSync.getEndTime()", endTime, 
consistencyPoint);
+  }
+
+  /**
+   * Tests getConsistencyPoint method in SYNCED_RECOVERY state without 
lastRoundInSync. Should throw
+   * IOException.
+   */
+  @Test
+  public void 
testGetConsistencyPointSyncedRecoveryStateWithoutLastRoundInSync()
+    throws IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to SYNCED_RECOVERY
+    discovery.setReplicationReplayState(
+      ReplicationLogDiscoveryReplay.ReplicationReplayState.SYNCED_RECOVERY);
+
+    // Don't set lastRoundInSync (should be null)
+
+    // Call getConsistencyPoint - should throw IOException
+    try {
+      discovery.getConsistencyPoint();
+      fail("Should throw IOException when lastRoundInSync is null in 
SYNCED_RECOVERY state");
+    } catch (IOException e) {
+      assertEquals("Error message should match",
+        "Not able to derive consistency point because lastRoundInSync is not 
initialized.",
+        e.getMessage());
+    }
+  }
+
+  /**
+   * Tests getConsistencyPoint method in NOT_INITIALIZED state. Should throw 
IOException.
+   */
+  @Test
+  public void testGetConsistencyPointNotInitializedState() throws IOException {
+    // Create ReplicationLogTracker
+    ReplicationLogTracker tracker =
+      Mockito.spy(createReplicationLogTracker(config, haGroupName, localFs, 
standbyUri));
+    HAGroupStoreRecord haGroupStoreRecord =
+      new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.STANDBY, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+        peerZkUrl, zkUrl, peerZkUrl, 0L);
+    TestableReplicationLogDiscoveryReplay discovery =
+      new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord);
+
+    // Set state to NOT_INITIALIZED
+    discovery.setReplicationReplayState(
+      ReplicationLogDiscoveryReplay.ReplicationReplayState.NOT_INITIALIZED);
+
+    // Call getConsistencyPoint - should throw IOException
+    try {
+      discovery.getConsistencyPoint();
+      fail("Should throw IOException when state is NOT_INITIALIZED");
+    } catch (IOException e) {
+      assertEquals("Error message should match",
+        "Not able to derive consistency point for current state: 
NOT_INITIALIZED", e.getMessage());
+    }
+  }
+
   /**
    * Testable implementation of ReplicationLogDiscoveryReplay for unit 
testing. Provides dependency
    * injection for HAGroupStoreRecord, tracks processed rounds, and supports 
simulating state
@@ -2195,5 +2544,9 @@ public class ReplicationLogDiscoveryReplayTestIT extends 
BaseTest {
     public List<ReplicationRound> getProcessedRounds() {
       return new java.util.ArrayList<>(processedRounds);
     }
+
+    public long getRoundTimeMills() {
+      return roundTimeMills;
+    }
   }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java
new file mode 100644
index 0000000000..97a6c49d2a
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogReplayServiceTestIT.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.ClusterRoleRecord;
+import org.apache.phoenix.jdbc.HABaseIT;
+import org.apache.phoenix.jdbc.HAGroupStoreClient;
+import org.apache.phoenix.jdbc.PhoenixHAAdmin;
+import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogReplayServiceTestIT extends HABaseIT {
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  private String zkUrl;
+  private String peerZkUrl;
+  private FileSystem localFs;
+  private URI standbyUri;
+  private PhoenixHAAdmin haAdmin;
+  private PhoenixHAAdmin peerHaAdmin;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    CLUSTERS.start();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    zkUrl = getLocalZkUrl(conf1);
+    peerZkUrl = CLUSTERS.getZkUrl2();
+    localFs = FileSystem.getLocal(conf1);
+    standbyUri = testFolder.getRoot().toURI();
+    haAdmin = new PhoenixHAAdmin(zkUrl, conf1, 
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
+    peerHaAdmin = new PhoenixHAAdmin(peerZkUrl, conf2, 
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE);
+    cleanupHAGroupState();
+
+    // Set the required configuration for ReplicationLogReplay
+    conf1.set(ReplicationLogReplay.REPLICATION_LOG_REPLAY_HDFS_URL_KEY, 
standbyUri.toString());
+    // Enable replication replay service
+    
conf1.setBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
 true);
+
+  }
+
+  /**
+   * Tests getConsistencyPoint method of ReplicationLogReplayService with 
multiple HA groups.
+   * Verifies that it returns the minimum consistency point across all HA 
groups.
+   */
+  @Test
+  public void testGetConsistencyPointMultipleGroups() throws IOException, 
SQLException {
+    final String haGroupName1 = testName.getMethodName() + "_1";
+    final String haGroupName2 = testName.getMethodName() + "_2";
+
+    // Insert HAGroupStoreRecords into the system table
+    HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName1, zkUrl, 
peerZkUrl,
+      CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+      ClusterRoleRecord.ClusterRole.ACTIVE, 
ClusterRoleRecord.ClusterRole.STANDBY, null);
+
+    HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName2, zkUrl, 
peerZkUrl,
+      CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+      ClusterRoleRecord.ClusterRole.ACTIVE, 
ClusterRoleRecord.ClusterRole.STANDBY, null);
+
+    // Set up consistency points for both groups
+    long consistencyPoint1 = 1704153600000L; // 2024-01-02 00:00:00
+    long consistencyPoint2 = 1704240000000L; // 2024-01-03 00:00:00
+
+    // Create testable replays with mocked consistency points
+    TestableReplicationLogReplay testableReplay1 =
+      new TestableReplicationLogReplay(conf1, haGroupName1);
+    testableReplay1.setConsistencyPoint(consistencyPoint1);
+
+    TestableReplicationLogReplay testableReplay2 =
+      new TestableReplicationLogReplay(conf1, haGroupName2);
+    testableReplay2.setConsistencyPoint(consistencyPoint2);
+
+    // Create a spy on ReplicationLogReplayService and mock 
getReplicationLogReplay
+    ReplicationLogReplayService service = 
ReplicationLogReplayService.getInstance(conf1);
+    ReplicationLogReplayService serviceSpy = Mockito.spy(service);
+
+    
Mockito.doReturn(testableReplay1).when(serviceSpy).getReplicationLogReplay(haGroupName1);
+    
Mockito.doReturn(testableReplay2).when(serviceSpy).getReplicationLogReplay(haGroupName2);
+
+    // Call getConsistencyPoint
+    long consistencyPoint = serviceSpy.getConsistencyPoint();
+
+    // Should return the minimum consistency point across all groups
+    // Since consistencyPoint1 < consistencyPoint2, consistencyPoint should be 
consistencyPoint1
+    assertEquals("Consistency point should be the minimum across all HA 
groups", consistencyPoint1,
+      consistencyPoint);
+  }
+
+  /**
+   * Tests getConsistencyPoint method of ReplicationLogReplayService with a 
single HA group.
+   * Verifies that it returns the consistency point for that group.
+   */
+  @Test
+  public void testGetConsistencyPointSingleGroup() throws IOException, 
SQLException {
+    final String haGroupName = testName.getMethodName();
+
+    // Insert HAGroupStoreRecord into the system table
+    HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName, zkUrl, 
peerZkUrl,
+      CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+      ClusterRoleRecord.ClusterRole.ACTIVE, 
ClusterRoleRecord.ClusterRole.STANDBY, null);
+
+    // Set up consistency point for the group
+    long consistencyPoint = 1704153600000L; // 2024-01-02 00:00:00
+
+    // Create testable replay with mocked consistency point
+    TestableReplicationLogReplay testableReplay =
+      new TestableReplicationLogReplay(conf1, haGroupName);
+    testableReplay.setConsistencyPoint(consistencyPoint);
+
+    // Create a spy on ReplicationLogReplayService and mock 
getReplicationLogReplay
+    ReplicationLogReplayService service = 
ReplicationLogReplayService.getInstance(conf1);
+    ReplicationLogReplayService serviceSpy = Mockito.spy(service);
+
+    
Mockito.doReturn(testableReplay).when(serviceSpy).getReplicationLogReplay(haGroupName);
+
+    // Call getConsistencyPoint
+    long result = serviceSpy.getConsistencyPoint();
+
+    // Should return the consistency point for the single group
+    // Since we start with currentTime and compare with Math.min, it should 
return the group's
+    // consistency point
+    assertEquals("Consistency point should match the single HA group's 
consistency point",
+      consistencyPoint, result);
+  }
+
+  /**
+   * Tests getConsistencyPoint method when no HA groups exist. Should return 
currentTime (initial
+   * value) when there are no groups.
+   */
+  @Test
+  public void testGetConsistencyPointWithNoGroups() throws IOException, 
SQLException {
+    // Ensure no HA groups exist
+    cleanupHAGroupState();
+
+    // Mock current time
+    long mockedCurrentTime = 1704153600000L; // 2024-01-02 00:00:00
+    EnvironmentEdge edge = () -> mockedCurrentTime;
+    EnvironmentEdgeManager.injectEdge(edge);
+
+    try {
+      // Get ReplicationLogReplayService instance
+      ReplicationLogReplayService service = 
ReplicationLogReplayService.getInstance(conf1);
+
+      // Call getConsistencyPoint
+      long consistencyPoint = service.getConsistencyPoint();
+
+      // Should return mocked currentTime when no groups exist (since it's 
initialized with
+      // currentTime)
+      assertEquals("Consistency point should equal mocked currentTime when no 
HA groups exist",
+        mockedCurrentTime, consistencyPoint);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  private void cleanupHAGroupState() throws SQLException {
+    // Clean up existing HAGroupStoreRecords
+    try {
+      List<String> haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl);
+      for (String haGroupName : haGroupNames) {
+        haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+        
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+      }
+
+    } catch (Exception e) {
+      // Ignore cleanup errors
+    }
+    // Remove any existing entries in the system table
+    HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl);
+  }
+
+  /**
+   * Testable implementation of ReplicationLogReplay for testing. Allows 
mocking the
+   * getConsistencyPoint method via getReplicationReplayLogDiscovery().
+   */
+  private static class TestableReplicationLogReplay extends 
ReplicationLogReplay {
+    private ReplicationLogDiscoveryReplay mockDiscovery;
+
+    public TestableReplicationLogReplay(org.apache.hadoop.conf.Configuration 
conf,
+      String haGroupName) {
+      super(conf, haGroupName);
+      // Create a mock discovery that we can configure
+      mockDiscovery = Mockito.mock(ReplicationLogDiscoveryReplay.class);
+    }
+
+    public void setConsistencyPoint(long consistencyPoint) {
+      try {
+        when(mockDiscovery.getConsistencyPoint()).thenReturn(consistencyPoint);
+      } catch (IOException e) {
+        // This shouldn't happen during mock setup, but handle it just in case
+        throw new RuntimeException("Failed to set consistency point", e);
+      }
+    }
+
+    @Override
+    protected ReplicationLogDiscoveryReplay getReplicationReplayLogDiscovery() 
{
+      return mockDiscovery;
+    }
+  }
+}

Reply via email to