This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 9c5e4d795a PHOENIX-7863 Add replication consistency point guard to 
compaction (#2489)
9c5e4d795a is described below

commit 9c5e4d795a58473ee1245a454f84b4d98c3a7e51
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sat Jun 27 03:37:15 2026 +0530

    PHOENIX-7863 Add replication consistency point guard to compaction (#2489)
---
 .../phoenix/coprocessor/CompactionScanner.java     |  52 +++-
 .../reader/ReplicationLogDiscoveryReplay.java      |   2 +-
 .../reader/ReplicationLogReplayService.java        |  88 ++++++
 .../reader/CompactionReplicationGuardIT.java       | 303 +++++++++++++++++++++
 .../coprocessor/CompactionGuardFormulaTest.java    | 132 +++++++++
 .../reader/ReplicationConsistencyPointTest.java    |  85 ++++++
 6 files changed, 657 insertions(+), 5 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index f12dc77f72..cb8d890d1c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -81,6 +81,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
 import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
 import org.apache.phoenix.schema.CompiledTTLExpression;
 import org.apache.phoenix.schema.ConditionalTTLExpression;
@@ -139,6 +140,7 @@ public class CompactionScanner implements InternalScanner {
   private final Store store;
   private final RegionCoprocessorEnvironment env;
   private long maxLookbackWindowStart;
+  private final long replicationConsistencyPoint;
   private final long maxLookbackInMillis;
   private int minVersion;
   private int maxVersion;
@@ -199,8 +201,19 @@ public class CompactionScanner implements InternalScanner {
     this.maxLookbackWindowStart = this.maxLookbackInMillis == 0
       ? compactionTime
       : compactionTime - (this.maxLookbackInMillis + 1);
-    ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+    Configuration conf = env.getConfiguration();
     this.major = major && !forceMinorCompaction;
+    boolean replayEnabled =
+      
conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
+        ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED);
+    if (this.major && replayEnabled) {
+      this.replicationConsistencyPoint =
+        ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName, 
columnFamilyName);
+    } else {
+      this.replicationConsistencyPoint =
+        ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED;
+    }
+    ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
     this.minVersion = cfd.getMinVersions();
     this.maxVersion = cfd.getMaxVersions();
     this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : 
cfd.getKeepDeletedCells();
@@ -1631,6 +1644,33 @@ public class CompactionScanner implements 
InternalScanner {
     }
   }
 
+  /**
+   * Computes the effective max-lookback boundary for a row, capped by the 
replication consistency
+   * point. The consistency point represents an exclusive upper bound: 
everything with ts <
+   * consistencyPoint has been replayed. We subtract 1 so that cells at 
exactly ts ==
+   * consistencyPoint satisfy the strict-greater retention check and are 
retained.
+   * @param ttlWindowStart              row TTL window start in millis since 
epoch
+   * @param maxLookbackWindowStart      store-level max-lookback window start 
in millis since epoch
+   * @param replicationConsistencyPoint exclusive upper bound of replayed 
timestamps;
+   *                                    CONSISTENCY_POINT_UNAVAILABLE (0) 
retains all,
+   *                                    CONSISTENCY_POINT_GUARD_DISABLED 
(Long.MAX_VALUE) means
+   *                                    guard is a no-op
+   * @return effective boundary for the strict-greater retention compare 
(millis since epoch)
+   */
+  public static long computeRowMaxLookbackWithGuard(long ttlWindowStart,
+    long maxLookbackWindowStart, long replicationConsistencyPoint) {
+    if (
+      replicationConsistencyPoint == 
ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE
+        || replicationConsistencyPoint
+            == ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED
+    ) {
+      return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart),
+        replicationConsistencyPoint);
+    }
+    return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart),
+      replicationConsistencyPoint - 1);
+  }
+
   /**
    * The context for a given row during compaction. A row may have multiple 
compaction row versions.
    * CompactionScanner uses the same row context for these versions.
@@ -1657,10 +1697,14 @@ public class CompactionScanner implements 
InternalScanner {
     private void setTTL(long ttlInSecs) {
       this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1);
       this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 : 
compactionTime - ttl;
-      this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
+      this.maxLookbackWindowStartForRow = 
computeRowMaxLookbackWithGuard(ttlWindowStart,
+        maxLookbackWindowStart, replicationConsistencyPoint);
       if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d, 
maxLookbackWindowStart=%d)",
-          ttlWindowStart, maxLookbackWindowStart));
+        LOGGER.trace(String.format(
+          "RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d, "
+            + "replicationConsistencyPoint=%d, 
maxLookbackWindowStartForRow=%d)",
+          ttlWindowStart, maxLookbackWindowStart, replicationConsistencyPoint,
+          maxLookbackWindowStartForRow));
       }
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index 4d5f886d51..ec7a1da535 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -103,7 +103,7 @@ public class ReplicationLogDiscoveryReplay extends 
ReplicationLogDiscovery {
    */
   public static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0;
 
-  private ReplicationRound lastRoundInSync;
+  private volatile ReplicationRound lastRoundInSync;
 
   // AtomicReference ensures listener updates are visible to replay thread
   private final AtomicReference<ReplicationReplayState> replicationReplayState 
=
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index 24d40faac7..ac9ec10b4f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -29,6 +29,9 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.thirdparty.com.google.common.base.Suppliers;
 import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -77,14 +80,51 @@ public class ReplicationLogReplayService {
    */
   public static final int 
DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30;
 
+  public static final long CONSISTENCY_POINT_UNAVAILABLE = 0L;
+  public static final long CONSISTENCY_POINT_GUARD_DISABLED = Long.MAX_VALUE;
+
+  public static final String CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY =
+    "phoenix.replication.compaction.guard.cache.ttl.seconds";
+  public static final long DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30;
+
+  private static volatile long lastFallbackWarnTime = 0;
+  private static final long WARN_LOG_INTERVAL_MS = 60_000;
+
   private static volatile ReplicationLogReplayService instance;
 
   private final Configuration conf;
   private ScheduledExecutorService scheduler;
   private volatile boolean isRunning = false;
+  private final Supplier<Long> cachedConsistencyPoint;
 
   private ReplicationLogReplayService(final Configuration conf) {
     this.conf = conf;
+    long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY,
+      DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS);
+    // Guava's memoizeWithExpiration does NOT cache exceptions — a thrown 
RuntimeException
+    // causes the next get() to re-invoke the supplier. We rely on this: 
transient failures
+    // (NN flap, SYSTEM.HA_GROUP unavailable) retry on the next compaction 
rather than
+    // caching a stale fallback for the full TTL.
+    this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> {
+      try {
+        return getConsistencyPoint();
+      } catch (IOException | SQLException e) {
+        throw new RuntimeException("Failed to fetch consistency point", e);
+      }
+    }, cacheTtl, TimeUnit.SECONDS);
+  }
+
+  private ReplicationLogReplayService(Configuration conf, long 
fixedConsistencyPoint) {
+    this.conf = conf;
+    this.cachedConsistencyPoint = () -> fixedConsistencyPoint;
+  }
+
+  private ReplicationLogReplayService(Configuration conf, Supplier<Long> 
supplier) {
+    this.conf = conf;
+    long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY,
+      DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS);
+    this.cachedConsistencyPoint =
+      Suppliers.memoizeWithExpiration(supplier, cacheTtl, TimeUnit.SECONDS);
   }
 
   /**
@@ -105,6 +145,28 @@ public class ReplicationLogReplayService {
     return instance;
   }
 
+  @VisibleForTesting
+  public static void setConsistencyPointForTesting(Configuration conf, long 
fixedConsistencyPoint) {
+    synchronized (ReplicationLogReplayService.class) {
+      instance = new ReplicationLogReplayService(conf, fixedConsistencyPoint);
+    }
+  }
+
+  @VisibleForTesting
+  public static void setConsistencyPointSupplierForTesting(Configuration conf,
+    Supplier<Long> supplier) {
+    synchronized (ReplicationLogReplayService.class) {
+      instance = new ReplicationLogReplayService(conf, supplier);
+    }
+  }
+
+  @VisibleForTesting
+  public static void resetInstanceForTesting() {
+    synchronized (ReplicationLogReplayService.class) {
+      instance = null;
+    }
+  }
+
   /**
    * Starts the replication log replay service by initializing the scheduler 
and scheduling periodic
    * replay operations for each HA Group.
@@ -229,6 +291,32 @@ public class ReplicationLogReplayService {
     return consistencyPoint;
   }
 
+  /**
+   * Resolves the minimum replication consistency point across all HA groups. 
Uses a cached value
+   * with a configurable TTL (see {@link 
#CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY}) to avoid
+   * repeated RPCs during compaction bursts. Returns {@link 
#CONSISTENCY_POINT_UNAVAILABLE} on any
+   * failure (caller treats this as "retain all delete markers").
+   */
+  public static long resolveConsistencyPoint(Configuration conf, String 
tableName,
+    String columnFamilyName) {
+    try {
+      long consistencyPoint = getInstance(conf).cachedConsistencyPoint.get();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Replication guard: table={} store={} consistencyPoint={}", 
tableName,
+          columnFamilyName, consistencyPoint);
+      }
+      return consistencyPoint;
+    } catch (Exception e) {
+      long now = System.currentTimeMillis();
+      if (now - lastFallbackWarnTime > WARN_LOG_INTERVAL_MS) {
+        lastFallbackWarnTime = now;
+        LOG.warn("Replication guard: consistency point unavailable for 
table={} store={}."
+          + " Retaining all delete markers.", tableName, columnFamilyName, e);
+      }
+      return CONSISTENCY_POINT_UNAVAILABLE;
+    }
+  }
+
   /** Returns the list of HA groups on the cluster */
   protected List<String> getReplicationGroups() throws SQLException {
     return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java
new file mode 100644
index 0000000000..2279ab929f
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
+import static 
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Integration tests for the replication consistency point compaction guard. 
Verifies that
+ * CompactionScanner retains delete markers with timestamps newer than the 
consistency point on
+ * clusters where replication replay is enabled.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class CompactionReplicationGuardIT extends BaseTest {
+
+  private static final int MAX_LOOKBACK_AGE = 15;
+  private static final int ROWS_POPULATED = 2;
+  private ManualEnvironmentEdge injectEdge;
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
+    props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(MAX_LOOKBACK_AGE));
+    props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, 
Boolean.toString(true));
+    props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
+      Boolean.toString(true));
+    props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    EnvironmentEdgeManager.reset();
+    injectEdge = new ManualEnvironmentEdge();
+    injectEdge.setValue(System.currentTimeMillis());
+    EnvironmentEdgeManager.injectEdge(injectEdge);
+  }
+
+  @After
+  public synchronized void afterTest() throws Exception {
+    ReplicationLogReplayService.resetInstanceForTesting();
+    EnvironmentEdgeManager.reset();
+    boolean refCountLeaked = isAnyStoreRefCountLeaked();
+    assertFalse("refCount leaked", refCountLeaked);
+  }
+
+  /**
+   * Test 1: Guard retains delete markers that maxLookback would have purged. 
The consistency point
+   * is set BEFORE the delete timestamp, so the delete marker is newer than 
the consistency point
+   * and must be retained even after maxLookback window passes.
+   */
+  @Test(timeout = 120000L)
+  public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws 
Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+      long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Set consistency point BEFORE the delete — meaning replay hasn't 
caught up to the delete
+      long consistencyPoint = beforeDeleteTime - 1;
+      injectMockConsistencyPoint(consistencyPoint);
+
+      // Advance time past maxLookback window — without guard, marker would be 
purged
+      injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Delete marker should be retained because its timestamp > 
consistencyPoint
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+    }
+  }
+
+  /**
+   * Test 2: Both maxLookback and guard allow purge. The consistency point has 
advanced past the
+   * delete marker AND maxLookback window has passed — marker should be purged.
+   */
+  @Test(timeout = 120000L)
+  public void 
testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback()
+    throws Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Advance time past maxLookback
+      injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+      // Set consistency point to current time — replay is fully caught up
+      long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis();
+      injectMockConsistencyPoint(consistencyPoint);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Delete marker should be purged — both guard and maxLookback agree
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1);
+    }
+  }
+
+  /**
+   * Test 3: MaxLookback retains even when guard wouldn't. Consistency point 
has advanced past the
+   * delete, but we're still within the maxLookback window — marker retained 
by maxLookback.
+   */
+  @Test(timeout = 120000L)
+  public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws 
Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Set consistency point to current time — guard would allow purge
+      long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis();
+      injectMockConsistencyPoint(consistencyPoint);
+
+      // Do NOT advance past maxLookback — still within the window
+      injectEdge.incrementValue(1);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Delete marker retained because still within maxLookback window
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+    }
+  }
+
+  /**
+   * Test 4: Guard fallback when consistency point unavailable — retains all 
delete markers. When
+   * the replay service throws an exception (e.g., not initialized), the guard 
falls back to
+   * retaining all markers to avoid data loss.
+   */
+  @Test(timeout = 120000L)
+  public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() 
throws Exception {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTable(dataTableName);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Inject consistency point as UNAVAILABLE — simulating fallback when 
replay service fails
+      
ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(),
+        CONSISTENCY_POINT_UNAVAILABLE);
+
+      // Advance past maxLookback
+      injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Fallback retains all — delete marker NOT purged despite maxLookback 
passing
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+    }
+  }
+
+  /**
+   * Test 5: Guard retains delete markers on a table with explicit TTL. The 
consistency point is set
+   * BEFORE the delete, time advances past both TTL and maxLookback, and the 
guard still retains the
+   * delete marker because its timestamp is newer than the consistency point.
+   */
+  @Test(timeout = 120000L)
+  public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception {
+    int ttlSeconds = 30;
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      createTableWithTTL(dataTableName, ttlSeconds);
+      TableName dataTable = TableName.valueOf(dataTableName);
+      populateTable(dataTableName);
+
+      injectEdge.incrementValue(1);
+      long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();
+
+      // Delete a row
+      conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE 
id = 'a'");
+      conn.commit();
+      injectEdge.incrementValue(1);
+
+      // Set consistency point BEFORE the delete — replay hasn't caught up
+      long consistencyPoint = beforeDeleteTime - 1;
+      injectMockConsistencyPoint(consistencyPoint);
+
+      // Advance past both TTL and maxLookback — without guard, marker would 
be purged
+      injectEdge.incrementValue(ttlSeconds * 1000 + 1000);
+
+      flush(dataTable);
+      majorCompact(dataTable);
+
+      // Delete marker retained — guard caps purge at consistencyPoint
+      assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+    }
+  }
+
+  private void injectMockConsistencyPoint(long consistencyPoint) {
+    
ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(),
+      consistencyPoint);
+  }
+
+  private void flush(TableName table) throws IOException {
+    getUtility().getAdmin().flush(table);
+  }
+
+  private void majorCompact(TableName table) throws Exception {
+    TestUtil.majorCompact(getUtility(), table);
+  }
+
+  private void createTable(String tableName) throws SQLException {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute(
+        "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, 
val1 VARCHAR(10),"
+          + " val2 VARCHAR(10), val3 VARCHAR(10))");
+      conn.commit();
+    }
+  }
+
+  private void createTableWithTTL(String tableName, int ttlSeconds) throws 
SQLException {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement().execute(
+        "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, 
val1 VARCHAR(10),"
+          + " val2 VARCHAR(10), val3 VARCHAR(10)) TTL=" + ttlSeconds);
+      conn.commit();
+    }
+  }
+
+  private void populateTable(String tableName) throws SQLException {
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      conn.createStatement()
+        .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 
'abcd')");
+      conn.commit();
+      conn.createStatement()
+        .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 
'bcde')");
+      conn.commit();
+    }
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java
new file mode 100644
index 0000000000..e13bb86d66
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static 
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED;
+import static 
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for CompactionScanner.computeRowMaxLookbackWithGuard formula. 
Covers scenarios
+ * including those unreachable via standard TTL floor enforcement to guard 
against future changes to
+ * the TTL computation.
+ */
+public class CompactionGuardFormulaTest {
+
+  @Test
+  public void testTtlDominatedGuardCaps() {
+    // ttlWindowStart > maxLookbackWindowStart > consistencyPoint
+    // Simulates conditional TTL or future removal of TTL floor enforcement
+    long consistencyPoint = 1000L;
+    long maxLookbackWindowStart = 2000L;
+    long ttlWindowStart = 3000L;
+
+    long result = 
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+      maxLookbackWindowStart, consistencyPoint);
+
+    // max(3000, 2000) = 3000, min(3000, 999) = 999 → guard caps at 
consistencyPoint - 1
+    assertEquals(consistencyPoint - 1, result);
+  }
+
+  @Test
+  public void testLookbackDominatedGuardCaps() {
+    // maxLookbackWindowStart > ttlWindowStart > consistencyPoint
+    long consistencyPoint = 1000L;
+    long ttlWindowStart = 2000L;
+    long maxLookbackWindowStart = 3000L;
+
+    long result = 
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+      maxLookbackWindowStart, consistencyPoint);
+
+    // max(2000, 3000) = 3000, min(3000, 999) = 999 → guard caps at 
consistencyPoint - 1
+    assertEquals(consistencyPoint - 1, result);
+  }
+
+  @Test
+  public void testConsistencyPointBeyondBoth_guardInactive() {
+    // consistencyPoint > max(ttlWindowStart, maxLookbackWindowStart)
+    long maxLookbackWindowStart = 2000L;
+    long ttlWindowStart = 1000L;
+    long consistencyPoint = 5000L;
+
+    long result = 
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+      maxLookbackWindowStart, consistencyPoint);
+
+    // max(1000, 2000) = 2000, min(2000, 4999) = 2000 → guard doesn't restrict
+    assertEquals(maxLookbackWindowStart, result);
+  }
+
+  @Test
+  public void testConsistencyPointZero_retainsAll() {
+    // consistencyPoint = UNAVAILABLE signals fallback — retain all delete 
markers
+    long maxLookbackWindowStart = 2000L;
+    long ttlWindowStart = 3000L;
+    long consistencyPoint = CONSISTENCY_POINT_UNAVAILABLE;
+
+    long result = 
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+      maxLookbackWindowStart, consistencyPoint);
+
+    assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result);
+  }
+
+  @Test
+  public void testConsistencyPointMaxValue_guardDisabled() {
+    // GUARD_DISABLED used when replay is off — guard is effectively a no-op
+    long maxLookbackWindowStart = 2000L;
+    long ttlWindowStart = 3000L;
+    long consistencyPoint = CONSISTENCY_POINT_GUARD_DISABLED;
+
+    long result = 
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+      maxLookbackWindowStart, consistencyPoint);
+
+    // max(3000, 2000) = 3000, min(3000, MAX_VALUE) = 3000 → normal behavior
+    assertEquals(ttlWindowStart, result);
+  }
+
+  @Test
+  public void testBoundaryDeleteAtExactlyConsistencyPoint_isRetained() {
+    // A delete marker at ts == consistencyPoint has NOT been replayed 
(exclusive upper bound).
+    // The formula must produce a boundary below consistencyPoint so that the 
strict-greater
+    // retention check (ts > boundary) retains it.
+    long consistencyPoint = 5000L;
+    long maxLookbackWindowStart = 6000L;
+    long ttlWindowStart = 4000L;
+
+    long result = 
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+      maxLookbackWindowStart, consistencyPoint);
+
+    // Boundary = consistencyPoint - 1 = 4999; a cell at ts=5000 satisfies ts 
> 4999
+    assertEquals(consistencyPoint - 1, result);
+  }
+
+  @Test
+  public void testConsistencyPointBetweenInputs() {
+    // ttlWindowStart < consistencyPoint < maxLookbackWindowStart
+    long ttlWindowStart = 1000L;
+    long consistencyPoint = 2500L;
+    long maxLookbackWindowStart = 3000L;
+
+    long result = 
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+      maxLookbackWindowStart, consistencyPoint);
+
+    // max(1000, 3000) = 3000, min(3000, 2499) = 2499 → guard caps
+    assertEquals(consistencyPoint - 1, result);
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java
new file mode 100644
index 0000000000..e74a63a41d
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static 
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE;
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+/**
+ * Tests for ReplicationLogReplayService.resolveConsistencyPoint caching 
behavior.
+ */
+public class ReplicationConsistencyPointTest {
+
+  @Test
+  public void testCachedConsistencyPointAvoidsRepeatedFetches() {
+    Configuration conf = new Configuration(false);
+    AtomicInteger fetchCount = new AtomicInteger(0);
+    ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, () 
-> {
+      fetchCount.incrementAndGet();
+      return 500000L;
+    });
+
+    try {
+      String table = "TEST_TABLE";
+      String cf = "0";
+
+      long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, 
table, cf);
+      long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, 
table, cf);
+      long result3 = ReplicationLogReplayService.resolveConsistencyPoint(conf, 
table, cf);
+
+      assertEquals(500000L, result1);
+      assertEquals(500000L, result2);
+      assertEquals(500000L, result3);
+      assertEquals(1, fetchCount.get());
+    } finally {
+      ReplicationLogReplayService.resetInstanceForTesting();
+    }
+  }
+
+  @Test
+  public void testTransientFailureNotCached_retriesOnNextCall() {
+    Configuration conf = new Configuration(false);
+    AtomicInteger fetchCount = new AtomicInteger(0);
+    ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, () 
-> {
+      int attempt = fetchCount.incrementAndGet();
+      if (attempt == 1) {
+        throw new RuntimeException("Simulated transient failure");
+      }
+      return 700000L;
+    });
+
+    try {
+      String table = "TEST_TABLE";
+      String cf = "0";
+
+      long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf, 
table, cf);
+      assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result1);
+
+      long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf, 
table, cf);
+      assertEquals(700000L, result2);
+
+      assertEquals(2, fetchCount.get());
+    } finally {
+      ReplicationLogReplayService.resetInstanceForTesting();
+    }
+  }
+}


Reply via email to