This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 9c5e4d795a PHOENIX-7863 Add replication consistency point guard to
compaction (#2489)
9c5e4d795a is described below
commit 9c5e4d795a58473ee1245a454f84b4d98c3a7e51
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sat Jun 27 03:37:15 2026 +0530
PHOENIX-7863 Add replication consistency point guard to compaction (#2489)
---
.../phoenix/coprocessor/CompactionScanner.java | 52 +++-
.../reader/ReplicationLogDiscoveryReplay.java | 2 +-
.../reader/ReplicationLogReplayService.java | 88 ++++++
.../reader/CompactionReplicationGuardIT.java | 303 +++++++++++++++++++++
.../coprocessor/CompactionGuardFormulaTest.java | 132 +++++++++
.../reader/ReplicationConsistencyPointTest.java | 85 ++++++
6 files changed, 657 insertions(+), 5 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index f12dc77f72..cb8d890d1c 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -81,6 +81,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
import org.apache.phoenix.schema.CompiledTTLExpression;
import org.apache.phoenix.schema.ConditionalTTLExpression;
@@ -139,6 +140,7 @@ public class CompactionScanner implements InternalScanner {
private final Store store;
private final RegionCoprocessorEnvironment env;
private long maxLookbackWindowStart;
+ private final long replicationConsistencyPoint;
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
@@ -199,8 +201,19 @@ public class CompactionScanner implements InternalScanner {
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0
? compactionTime
: compactionTime - (this.maxLookbackInMillis + 1);
- ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+ Configuration conf = env.getConfiguration();
this.major = major && !forceMinorCompaction;
+ boolean replayEnabled =
+
conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
+ ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED);
+ if (this.major && replayEnabled) {
+ this.replicationConsistencyPoint =
+ ReplicationLogReplayService.resolveConsistencyPoint(conf, tableName,
columnFamilyName);
+ } else {
+ this.replicationConsistencyPoint =
+ ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED;
+ }
+ ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL :
cfd.getKeepDeletedCells();
@@ -1631,6 +1644,33 @@ public class CompactionScanner implements
InternalScanner {
}
}
+ /**
+ * Computes the effective max-lookback boundary for a row, capped by the
replication consistency
+ * point. The consistency point represents an exclusive upper bound:
everything with ts <
+ * consistencyPoint has been replayed. We subtract 1 so that cells at
exactly ts ==
+ * consistencyPoint satisfy the strict-greater retention check and are
retained.
+ * @param ttlWindowStart row TTL window start in millis since
epoch
+ * @param maxLookbackWindowStart store-level max-lookback window start
in millis since epoch
+ * @param replicationConsistencyPoint exclusive upper bound of replayed
timestamps;
+ * CONSISTENCY_POINT_UNAVAILABLE (0)
retains all,
+ * CONSISTENCY_POINT_GUARD_DISABLED
(Long.MAX_VALUE) means
+ * guard is a no-op
+ * @return effective boundary for the strict-greater retention compare
(millis since epoch)
+ */
+ public static long computeRowMaxLookbackWithGuard(long ttlWindowStart,
+ long maxLookbackWindowStart, long replicationConsistencyPoint) {
+ if (
+ replicationConsistencyPoint ==
ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE
+ || replicationConsistencyPoint
+ == ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED
+ ) {
+ return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart),
+ replicationConsistencyPoint);
+ }
+ return Math.min(Math.max(ttlWindowStart, maxLookbackWindowStart),
+ replicationConsistencyPoint - 1);
+ }
+
/**
* The context for a given row during compaction. A row may have multiple
compaction row versions.
* CompactionScanner uses the same row context for these versions.
@@ -1657,10 +1697,14 @@ public class CompactionScanner implements
InternalScanner {
private void setTTL(long ttlInSecs) {
this.ttl = Math.max(ttlInSecs * 1000, maxLookbackInMillis + 1);
this.ttlWindowStart = ttlInSecs == HConstants.FOREVER ? 1 :
compactionTime - ttl;
- this.maxLookbackWindowStartForRow = Math.max(ttlWindowStart,
maxLookbackWindowStart);
+ this.maxLookbackWindowStartForRow =
computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, replicationConsistencyPoint);
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace(String.format("RowContext:- (ttlWindowStart=%d,
maxLookbackWindowStart=%d)",
- ttlWindowStart, maxLookbackWindowStart));
+ LOGGER.trace(String.format(
+ "RowContext:- (ttlWindowStart=%d, maxLookbackWindowStart=%d, "
+ + "replicationConsistencyPoint=%d,
maxLookbackWindowStartForRow=%d)",
+ ttlWindowStart, maxLookbackWindowStart, replicationConsistencyPoint,
+ maxLookbackWindowStartForRow));
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
index 4d5f886d51..ec7a1da535 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java
@@ -103,7 +103,7 @@ public class ReplicationLogDiscoveryReplay extends
ReplicationLogDiscovery {
*/
public static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0;
- private ReplicationRound lastRoundInSync;
+ private volatile ReplicationRound lastRoundInSync;
// AtomicReference ensures listener updates are visible to replay thread
private final AtomicReference<ReplicationReplayState> replicationReplayState
=
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
index 24d40faac7..ac9ec10b4f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java
@@ -29,6 +29,9 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Supplier;
+import org.apache.phoenix.thirdparty.com.google.common.base.Suppliers;
import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -77,14 +80,51 @@ public class ReplicationLogReplayService {
*/
public static final int
DEFAULT_REPLICATION_REPLAY_SERVICE_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 30;
+ public static final long CONSISTENCY_POINT_UNAVAILABLE = 0L;
+ public static final long CONSISTENCY_POINT_GUARD_DISABLED = Long.MAX_VALUE;
+
+ public static final String CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY =
+ "phoenix.replication.compaction.guard.cache.ttl.seconds";
+ public static final long DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS = 30;
+
+ private static volatile long lastFallbackWarnTime = 0;
+ private static final long WARN_LOG_INTERVAL_MS = 60_000;
+
private static volatile ReplicationLogReplayService instance;
private final Configuration conf;
private ScheduledExecutorService scheduler;
private volatile boolean isRunning = false;
+ private final Supplier<Long> cachedConsistencyPoint;
private ReplicationLogReplayService(final Configuration conf) {
this.conf = conf;
+ long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY,
+ DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS);
+ // Guava's memoizeWithExpiration does NOT cache exceptions — a thrown
RuntimeException
+ // causes the next get() to re-invoke the supplier. We rely on this:
transient failures
+ // (NN flap, SYSTEM.HA_GROUP unavailable) retry on the next compaction
rather than
+ // caching a stale fallback for the full TTL.
+ this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> {
+ try {
+ return getConsistencyPoint();
+ } catch (IOException | SQLException e) {
+ throw new RuntimeException("Failed to fetch consistency point", e);
+ }
+ }, cacheTtl, TimeUnit.SECONDS);
+ }
+
+ private ReplicationLogReplayService(Configuration conf, long
fixedConsistencyPoint) {
+ this.conf = conf;
+ this.cachedConsistencyPoint = () -> fixedConsistencyPoint;
+ }
+
+ private ReplicationLogReplayService(Configuration conf, Supplier<Long>
supplier) {
+ this.conf = conf;
+ long cacheTtl = conf.getLong(CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY,
+ DEFAULT_CONSISTENCY_POINT_CACHE_TTL_SECONDS);
+ this.cachedConsistencyPoint =
+ Suppliers.memoizeWithExpiration(supplier, cacheTtl, TimeUnit.SECONDS);
}
/**
@@ -105,6 +145,28 @@ public class ReplicationLogReplayService {
return instance;
}
+ @VisibleForTesting
+ public static void setConsistencyPointForTesting(Configuration conf, long
fixedConsistencyPoint) {
+ synchronized (ReplicationLogReplayService.class) {
+ instance = new ReplicationLogReplayService(conf, fixedConsistencyPoint);
+ }
+ }
+
+ @VisibleForTesting
+ public static void setConsistencyPointSupplierForTesting(Configuration conf,
+ Supplier<Long> supplier) {
+ synchronized (ReplicationLogReplayService.class) {
+ instance = new ReplicationLogReplayService(conf, supplier);
+ }
+ }
+
+ @VisibleForTesting
+ public static void resetInstanceForTesting() {
+ synchronized (ReplicationLogReplayService.class) {
+ instance = null;
+ }
+ }
+
/**
* Starts the replication log replay service by initializing the scheduler
and scheduling periodic
* replay operations for each HA Group.
@@ -229,6 +291,32 @@ public class ReplicationLogReplayService {
return consistencyPoint;
}
+ /**
+ * Resolves the minimum replication consistency point across all HA groups.
Uses a cached value
+ * with a configurable TTL (see {@link
#CONSISTENCY_POINT_CACHE_TTL_SECONDS_KEY}) to avoid
+ * repeated RPCs during compaction bursts. Returns {@link
#CONSISTENCY_POINT_UNAVAILABLE} on any
+ * failure (caller treats this as "retain all delete markers").
+ */
+ public static long resolveConsistencyPoint(Configuration conf, String
tableName,
+ String columnFamilyName) {
+ try {
+ long consistencyPoint = getInstance(conf).cachedConsistencyPoint.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replication guard: table={} store={} consistencyPoint={}",
tableName,
+ columnFamilyName, consistencyPoint);
+ }
+ return consistencyPoint;
+ } catch (Exception e) {
+ long now = System.currentTimeMillis();
+ if (now - lastFallbackWarnTime > WARN_LOG_INTERVAL_MS) {
+ lastFallbackWarnTime = now;
+ LOG.warn("Replication guard: consistency point unavailable for
table={} store={}."
+ + " Retaining all delete markers.", tableName, columnFamilyName, e);
+ }
+ return CONSISTENCY_POINT_UNAVAILABLE;
+ }
+ }
+
/** Returns the list of HA groups on the cluster */
protected List<String> getReplicationGroups() throws SQLException {
return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java
new file mode 100644
index 0000000000..2279ab929f
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
+import static
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Integration tests for the replication consistency point compaction guard.
Verifies that
+ * CompactionScanner retains delete markers with timestamps newer than the
consistency point on
+ * clusters where replication replay is enabled.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class CompactionReplicationGuardIT extends BaseTest {
+
+ private static final int MAX_LOOKBACK_AGE = 15;
+ private static final int ROWS_POPULATED = 2;
+ private ManualEnvironmentEdge injectEdge;
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
+ props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(MAX_LOOKBACK_AGE));
+ props.put(QueryServices.PHOENIX_COMPACTION_ENABLED,
Boolean.toString(true));
+ props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
+ Boolean.toString(true));
+ props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ EnvironmentEdgeManager.reset();
+ injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ }
+
+ @After
+ public synchronized void afterTest() throws Exception {
+ ReplicationLogReplayService.resetInstanceForTesting();
+ EnvironmentEdgeManager.reset();
+ boolean refCountLeaked = isAnyStoreRefCountLeaked();
+ assertFalse("refCount leaked", refCountLeaked);
+ }
+
+ /**
+ * Test 1: Guard retains delete markers that maxLookback would have purged.
The consistency point
+ * is set BEFORE the delete timestamp, so the delete marker is newer than
the consistency point
+ * and must be retained even after maxLookback window passes.
+ */
+ @Test(timeout = 120000L)
+ public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws
Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ createTable(dataTableName);
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+
+ injectEdge.incrementValue(1);
+ long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();
+
+ // Delete a row
+ conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE
id = 'a'");
+ conn.commit();
+ injectEdge.incrementValue(1);
+
+ // Set consistency point BEFORE the delete — meaning replay hasn't
caught up to the delete
+ long consistencyPoint = beforeDeleteTime - 1;
+ injectMockConsistencyPoint(consistencyPoint);
+
+ // Advance time past maxLookback window — without guard, marker would be
purged
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+ flush(dataTable);
+ majorCompact(dataTable);
+
+ // Delete marker should be retained because its timestamp >
consistencyPoint
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ }
+ }
+
+ /**
+ * Test 2: Both maxLookback and guard allow purge. The consistency point has
advanced past the
+ * delete marker AND maxLookback window has passed — marker should be purged.
+ */
+ @Test(timeout = 120000L)
+ public void
testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback()
+ throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ createTable(dataTableName);
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+
+ injectEdge.incrementValue(1);
+
+ // Delete a row
+ conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE
id = 'a'");
+ conn.commit();
+ injectEdge.incrementValue(1);
+
+ // Advance time past maxLookback
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+ // Set consistency point to current time — replay is fully caught up
+ long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis();
+ injectMockConsistencyPoint(consistencyPoint);
+
+ flush(dataTable);
+ majorCompact(dataTable);
+
+ // Delete marker should be purged — both guard and maxLookback agree
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1);
+ }
+ }
+
+ /**
+ * Test 3: MaxLookback retains even when guard wouldn't. Consistency point
has advanced past the
+ * delete, but we're still within the maxLookback window — marker retained
by maxLookback.
+ */
+ @Test(timeout = 120000L)
+ public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws
Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ createTable(dataTableName);
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+
+ injectEdge.incrementValue(1);
+
+ // Delete a row
+ conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE
id = 'a'");
+ conn.commit();
+ injectEdge.incrementValue(1);
+
+ // Set consistency point to current time — guard would allow purge
+ long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis();
+ injectMockConsistencyPoint(consistencyPoint);
+
+ // Do NOT advance past maxLookback — still within the window
+ injectEdge.incrementValue(1);
+
+ flush(dataTable);
+ majorCompact(dataTable);
+
+ // Delete marker retained because still within maxLookback window
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ }
+ }
+
+ /**
+ * Test 4: Guard fallback when consistency point unavailable — retains all
delete markers. When
+ * the replay service throws an exception (e.g., not initialized), the guard
falls back to
+ * retaining all markers to avoid data loss.
+ */
+ @Test(timeout = 120000L)
+ public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable()
throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ createTable(dataTableName);
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+
+ injectEdge.incrementValue(1);
+
+ // Delete a row
+ conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE
id = 'a'");
+ conn.commit();
+ injectEdge.incrementValue(1);
+
+ // Inject consistency point as UNAVAILABLE — simulating fallback when
replay service fails
+
ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(),
+ CONSISTENCY_POINT_UNAVAILABLE);
+
+ // Advance past maxLookback
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+
+ flush(dataTable);
+ majorCompact(dataTable);
+
+ // Fallback retains all — delete marker NOT purged despite maxLookback
passing
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ }
+ }
+
+ /**
+ * Test 5: Guard retains delete markers on a table with explicit TTL. The
consistency point is set
+ * BEFORE the delete, time advances past both TTL and maxLookback, and the
guard still retains the
+ * delete marker because its timestamp is newer than the consistency point.
+ */
+ @Test(timeout = 120000L)
+ public void testGuardRetainsDeleteMarkersWithExplicitTTL() throws Exception {
+ int ttlSeconds = 30;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ createTableWithTTL(dataTableName, ttlSeconds);
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+
+ injectEdge.incrementValue(1);
+ long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();
+
+ // Delete a row
+ conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE
id = 'a'");
+ conn.commit();
+ injectEdge.incrementValue(1);
+
+ // Set consistency point BEFORE the delete — replay hasn't caught up
+ long consistencyPoint = beforeDeleteTime - 1;
+ injectMockConsistencyPoint(consistencyPoint);
+
+ // Advance past both TTL and maxLookback — without guard, marker would
be purged
+ injectEdge.incrementValue(ttlSeconds * 1000 + 1000);
+
+ flush(dataTable);
+ majorCompact(dataTable);
+
+ // Delete marker retained — guard caps purge at consistencyPoint
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ }
+ }
+
+ private void injectMockConsistencyPoint(long consistencyPoint) {
+
ReplicationLogReplayService.setConsistencyPointForTesting(getUtility().getConfiguration(),
+ consistencyPoint);
+ }
+
+ private void flush(TableName table) throws IOException {
+ getUtility().getAdmin().flush(table);
+ }
+
+ private void majorCompact(TableName table) throws Exception {
+ TestUtil.majorCompact(getUtility(), table);
+ }
+
+ private void createTable(String tableName) throws SQLException {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY,
val1 VARCHAR(10),"
+ + " val2 VARCHAR(10), val3 VARCHAR(10))");
+ conn.commit();
+ }
+ }
+
+ private void createTableWithTTL(String tableName, int ttlSeconds) throws
SQLException {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY,
val1 VARCHAR(10),"
+ + " val2 VARCHAR(10), val3 VARCHAR(10)) TTL=" + ttlSeconds);
+ conn.commit();
+ }
+ }
+
+ private void populateTable(String tableName) throws SQLException {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc',
'abcd')");
+ conn.commit();
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd',
'bcde')");
+ conn.commit();
+ }
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java
new file mode 100644
index 0000000000..e13bb86d66
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionGuardFormulaTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_GUARD_DISABLED;
+import static
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for CompactionScanner.computeRowMaxLookbackWithGuard formula.
Covers scenarios
+ * including those unreachable via standard TTL floor enforcement to guard
against future changes to
+ * the TTL computation.
+ */
+public class CompactionGuardFormulaTest {
+
+ @Test
+ public void testTtlDominatedGuardCaps() {
+ // ttlWindowStart > maxLookbackWindowStart > consistencyPoint
+ // Simulates conditional TTL or future removal of TTL floor enforcement
+ long consistencyPoint = 1000L;
+ long maxLookbackWindowStart = 2000L;
+ long ttlWindowStart = 3000L;
+
+ long result =
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, consistencyPoint);
+
+ // max(3000, 2000) = 3000, min(3000, 999) = 999 → guard caps at
consistencyPoint - 1
+ assertEquals(consistencyPoint - 1, result);
+ }
+
+ @Test
+ public void testLookbackDominatedGuardCaps() {
+ // maxLookbackWindowStart > ttlWindowStart > consistencyPoint
+ long consistencyPoint = 1000L;
+ long ttlWindowStart = 2000L;
+ long maxLookbackWindowStart = 3000L;
+
+ long result =
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, consistencyPoint);
+
+ // max(2000, 3000) = 3000, min(3000, 999) = 999 → guard caps at
consistencyPoint - 1
+ assertEquals(consistencyPoint - 1, result);
+ }
+
+ @Test
+ public void testConsistencyPointBeyondBoth_guardInactive() {
+ // consistencyPoint > max(ttlWindowStart, maxLookbackWindowStart)
+ long maxLookbackWindowStart = 2000L;
+ long ttlWindowStart = 1000L;
+ long consistencyPoint = 5000L;
+
+ long result =
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, consistencyPoint);
+
+ // max(1000, 2000) = 2000, min(2000, 4999) = 2000 → guard doesn't restrict
+ assertEquals(maxLookbackWindowStart, result);
+ }
+
+ @Test
+ public void testConsistencyPointZero_retainsAll() {
+ // consistencyPoint = UNAVAILABLE signals fallback — retain all delete
markers
+ long maxLookbackWindowStart = 2000L;
+ long ttlWindowStart = 3000L;
+ long consistencyPoint = CONSISTENCY_POINT_UNAVAILABLE;
+
+ long result =
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, consistencyPoint);
+
+ assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result);
+ }
+
+ @Test
+ public void testConsistencyPointMaxValue_guardDisabled() {
+ // GUARD_DISABLED used when replay is off — guard is effectively a no-op
+ long maxLookbackWindowStart = 2000L;
+ long ttlWindowStart = 3000L;
+ long consistencyPoint = CONSISTENCY_POINT_GUARD_DISABLED;
+
+ long result =
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, consistencyPoint);
+
+ // max(3000, 2000) = 3000, min(3000, MAX_VALUE) = 3000 → normal behavior
+ assertEquals(ttlWindowStart, result);
+ }
+
+ @Test
+ public void testBoundaryDeleteAtExactlyConsistencyPoint_isRetained() {
+ // A delete marker at ts == consistencyPoint has NOT been replayed
(exclusive upper bound).
+ // The formula must produce a boundary below consistencyPoint so that the
strict-greater
+ // retention check (ts > boundary) retains it.
+ long consistencyPoint = 5000L;
+ long maxLookbackWindowStart = 6000L;
+ long ttlWindowStart = 4000L;
+
+ long result =
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, consistencyPoint);
+
+ // Boundary = consistencyPoint - 1 = 4999; a cell at ts=5000 satisfies ts
> 4999
+ assertEquals(consistencyPoint - 1, result);
+ }
+
+ @Test
+ public void testConsistencyPointBetweenInputs() {
+ // ttlWindowStart < consistencyPoint < maxLookbackWindowStart
+ long ttlWindowStart = 1000L;
+ long consistencyPoint = 2500L;
+ long maxLookbackWindowStart = 3000L;
+
+ long result =
CompactionScanner.computeRowMaxLookbackWithGuard(ttlWindowStart,
+ maxLookbackWindowStart, consistencyPoint);
+
+ // max(1000, 3000) = 3000, min(3000, 2499) = 2499 → guard caps
+ assertEquals(consistencyPoint - 1, result);
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java
new file mode 100644
index 0000000000..e74a63a41d
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationConsistencyPointTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static
org.apache.phoenix.replication.reader.ReplicationLogReplayService.CONSISTENCY_POINT_UNAVAILABLE;
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+/**
+ * Tests for ReplicationLogReplayService.resolveConsistencyPoint caching
behavior.
+ */
+public class ReplicationConsistencyPointTest {
+
+ @Test
+ public void testCachedConsistencyPointAvoidsRepeatedFetches() {
+ Configuration conf = new Configuration(false);
+ AtomicInteger fetchCount = new AtomicInteger(0);
+ ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, ()
-> {
+ fetchCount.incrementAndGet();
+ return 500000L;
+ });
+
+ try {
+ String table = "TEST_TABLE";
+ String cf = "0";
+
+ long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf,
table, cf);
+ long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf,
table, cf);
+ long result3 = ReplicationLogReplayService.resolveConsistencyPoint(conf,
table, cf);
+
+ assertEquals(500000L, result1);
+ assertEquals(500000L, result2);
+ assertEquals(500000L, result3);
+ assertEquals(1, fetchCount.get());
+ } finally {
+ ReplicationLogReplayService.resetInstanceForTesting();
+ }
+ }
+
+ @Test
+ public void testTransientFailureNotCached_retriesOnNextCall() {
+ Configuration conf = new Configuration(false);
+ AtomicInteger fetchCount = new AtomicInteger(0);
+ ReplicationLogReplayService.setConsistencyPointSupplierForTesting(conf, ()
-> {
+ int attempt = fetchCount.incrementAndGet();
+ if (attempt == 1) {
+ throw new RuntimeException("Simulated transient failure");
+ }
+ return 700000L;
+ });
+
+ try {
+ String table = "TEST_TABLE";
+ String cf = "0";
+
+ long result1 = ReplicationLogReplayService.resolveConsistencyPoint(conf,
table, cf);
+ assertEquals(CONSISTENCY_POINT_UNAVAILABLE, result1);
+
+ long result2 = ReplicationLogReplayService.resolveConsistencyPoint(conf,
table, cf);
+ assertEquals(700000L, result2);
+
+ assertEquals(2, fetchCount.get());
+ } finally {
+ ReplicationLogReplayService.resetInstanceForTesting();
+ }
+ }
+}