This is an automated email from the ASF dual-hosted git repository.

virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git


The following commit(s) were added to refs/heads/main by this push:
     new a67fed6  Phoenix RVC for Scan Pagination
a67fed6 is described below

commit a67fed693b16f4adc2333d9140ecc0aaf9d963c5
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Mar 5 11:57:31 2026 -0800

    Phoenix RVC for Scan Pagination
---
 .../apache/phoenix/ddb/service/QueryService.java   |   2 +-
 .../apache/phoenix/ddb/service/ScanService.java    | 168 ++++---------
 .../apache/phoenix/ddb/service/utils/DQLUtils.java |  57 +++--
 .../phoenix/ddb/service/utils/ScanConfig.java      |  32 +--
 .../phoenix/ddb/ScanExclusiveStartKeyIT.java       | 107 ++++----
 .../java/org/apache/phoenix/ddb/ScanIndex2IT.java  |   2 +
 .../java/org/apache/phoenix/ddb/ScanIndex3IT.java  |   2 +
 ...eyIT.java => ScanIndexExclusiveStartKeyIT.java} | 275 +++++++++++----------
 .../java/org/apache/phoenix/ddb/ScanIndexIT.java   |  98 ++++++++
 .../java/org/apache/phoenix/ddb/TestUtils.java     |  11 +-
 10 files changed, 396 insertions(+), 358 deletions(-)

diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
index fe16acc..c11a695 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
@@ -77,7 +77,7 @@ public class QueryService {
             boolean countOnly = 
ApiMetadata.SELECT_COUNT.equals(request.get(ApiMetadata.SELECT));
             return DQLUtils.executeStatementReturnResult(stmt,
                     getProjectionAttributes(request), useIndex, tablePKCols, 
indexPKCols, tableName,
-                    isSingleRowExpected, false, countOnly);
+                    isSingleRowExpected, countOnly);
         } catch (SQLException e) {
             throw new PhoenixServiceException(e);
         }
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
index 9a086c5..63fc4f0 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
@@ -24,7 +24,6 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.phoenix.ddb.ConnectionUtil;
@@ -90,8 +89,7 @@ public class ScanService {
         int effectiveLimit = getEffectiveLimit(request);
         boolean countOnly = 
ApiMetadata.SELECT_COUNT.equals(request.get(ApiMetadata.SELECT));
         
-        ScanConfig config = new ScanConfig(
-            determineScanType(exclusiveStartKey, useIndex, tablePKCols, 
indexPKCols),
+        ScanConfig config = new 
ScanConfig(determineScanType(exclusiveStartKey),
             useIndex, tablePKCols, indexPKCols, effectiveLimit, tableName, 
indexName, countOnly
         );
 
@@ -105,33 +103,20 @@ public class ScanService {
             config.setScanSegmentInfo(segmentInfo);
         }
 
-        // Execute based on scan type (same logic for both regular and segment 
scans)
-        switch (config.getType()) {
-            case NO_EXCLUSIVE_START_KEY:
-            case SINGLE_KEY_CONTINUATION:
-                return executeSingleQuery(connection, request, config);
-            case TWO_KEY_FIRST_QUERY:
-                return executeTwoKeyTableScan(connection, request, config);
-            default:
-                throw new IllegalStateException("Unsupported scan config type: 
" + config.getType());
-        }
+        PreparedStatement stmt = buildQuery(connection, request, config);
+        return DQLUtils.executeStatementReturnResult(stmt, 
getProjectionAttributes(request),
+                config.useIndex(), config.getTablePKCols(), 
config.getIndexPKCols(), config.getTableName(),
+                false, config.isCountOnly());
     }
 
     /**
      * Determine the appropriate scan type based on request parameters
      */
-    public static ScanType determineScanType(Map<String, Object> 
exclusiveStartKey,
-            boolean useIndex, List<PColumn> tablePKCols, List<PColumn> 
indexPKCols) {
+    public static ScanType determineScanType(Map<String, Object> 
exclusiveStartKey) {
         if (exclusiveStartKey == null || exclusiveStartKey.isEmpty()) {
             return ScanType.NO_EXCLUSIVE_START_KEY;
         }
-        
-        List<PColumn> relevantPKCols = useIndex ? indexPKCols : tablePKCols;
-        if (relevantPKCols.size() == 1) {
-            return ScanType.SINGLE_KEY_CONTINUATION;
-        } else {
-            return ScanType.TWO_KEY_FIRST_QUERY;
-        }
+        return ScanType.WITH_EXCLUSIVE_START_KEY;
     }
 
     /**
@@ -142,63 +127,6 @@ public class ScanService {
         return (requestLimit == null) ? MAX_SCAN_LIMIT : 
Math.min(requestLimit, MAX_SCAN_LIMIT);
     }
 
-    /**
-     * Execute a single query scan (for no pagination, single key, or original 
logic)
-     */
-    private static Map<String, Object> executeSingleQuery(Connection 
connection, Map<String, Object> request,
-                                                         ScanConfig config) 
throws SQLException {
-        PreparedStatement stmt = buildQuery(connection, request, config);
-        return DQLUtils.executeStatementReturnResult(stmt, 
getProjectionAttributes(request),
-                config.useIndex(), config.getTablePKCols(), 
config.getIndexPKCols(), config.getTableName(),
-                false, false, config.isCountOnly());
-    }
-
-    /**
-     * Execute two-key table scan using the two-query approach
-     */
-    private static Map<String, Object> executeTwoKeyTableScan(Connection 
connection, Map<String, Object> request,
-                                                            ScanConfig config) 
throws SQLException {
-        
-        // Execute first query: (pk1 = k1 AND pk2 > k2)
-        
-        PreparedStatement firstStmt = buildQuery(connection, request, config);
-        Map<String, Object> firstResult = 
DQLUtils.executeStatementReturnResult(firstStmt,
-                getProjectionAttributes(request), config.useIndex(), 
config.getTablePKCols(), config.getIndexPKCols(),
-                config.getTableName(), false, true, config.isCountOnly());
-        
-        List<Map<String, Object>> allItems = config.isCountOnly()
-                ? new ArrayList<>()
-                : new ArrayList<>((List<Map<String, Object>>) 
firstResult.get(ApiMetadata.ITEMS));
-        int totalCount = (Integer) firstResult.get(ApiMetadata.COUNT);
-        int totalScannedCount = (Integer) 
firstResult.get(ApiMetadata.SCANNED_COUNT);
-        Map<String, Object> lastEvaluatedKey = (Map<String, Object>) 
firstResult.get(ApiMetadata.LAST_EVALUATED_KEY);
-
-        // Execute second query if needed: (pk1 > k1)
-        if (totalCount < config.getLimit() && 
!(boolean)firstResult.get(DQLUtils.SIZE_LIMIT_REACHED)) {
-            int remainingLimit = config.getLimit() - totalCount;
-            ScanConfig secondConfig = 
config.cloneWithTypeAndLimit(ScanType.TWO_KEY_SECOND_QUERY, remainingLimit);
-            PreparedStatement secondStmt = buildQuery(connection, request, 
secondConfig);
-            Map<String, Object> secondResult = 
DQLUtils.executeStatementReturnResult(secondStmt,
-                    getProjectionAttributes(request), config.useIndex(), 
config.getTablePKCols(), config.getIndexPKCols(),
-                    config.getTableName(), false, false, config.isCountOnly());
-
-            if (!config.isCountOnly()) {
-                List<Map<String, Object>> secondItems = (List<Map<String, 
Object>>) secondResult.get(ApiMetadata.ITEMS);
-                allItems.addAll(secondItems);
-            }
-            totalCount += (Integer) secondResult.get(ApiMetadata.COUNT);
-            totalScannedCount += (Integer) 
secondResult.get(ApiMetadata.SCANNED_COUNT);
-            
-            // Use LastEvaluatedKey from second query if it returned items, 
otherwise from first query
-            Map<String, Object> secondLastKey = (Map<String, Object>) 
secondResult.get(ApiMetadata.LAST_EVALUATED_KEY);
-            if (secondLastKey != null) {
-                lastEvaluatedKey = secondLastKey;
-            }
-        }
-        
-        return buildScanResponse(allItems, totalCount, totalScannedCount, 
config.getTableName(), lastEvaluatedKey, config.isCountOnly());
-    }
-
     /**
      * Unified query builder that handles all scan types
      */
@@ -210,8 +138,8 @@ public class ScanService {
         // Add filter conditions
         boolean hasFilterCondition = addFilterConditionIfPresent(queryBuilder, 
request);
 
-        // Add key conditions
-        boolean hasKeyConditions = addKeyConditions(queryBuilder, config, 
hasFilterCondition);
+        // Add key conditions (RVC)
+        boolean hasKeyConditions = addRVC(queryBuilder, config, 
hasFilterCondition);
 
         // Add segment boundary conditions
         addSegmentBoundaryConditions(queryBuilder, config, hasFilterCondition 
|| hasKeyConditions);
@@ -261,12 +189,12 @@ public class ScanService {
     }
 
     /**
-     * Add key-based WHERE conditions based on scan type
+     * Add RVC clause if request has lastEvaluatedKey.
      *
-     * @return true if key conditions were added
+     * @return true if RVC clause was added
      */
-    private static boolean addKeyConditions(StringBuilder queryBuilder, 
ScanConfig config,
-            boolean hasPreviousConditions) {
+    private static boolean addRVC(StringBuilder queryBuilder, ScanConfig 
config,
+                                  boolean hasPreviousConditions) {
         if (config.getType() == ScanType.NO_EXCLUSIVE_START_KEY) {
             // No key conditions needed for simple scan
             return false;
@@ -277,22 +205,10 @@ public class ScanService {
         } else {
             queryBuilder.append(" WHERE ");
         }
-        
-        String partitionKeyName = CommonServiceUtils.getColumnExprFromPCol(
-                config.getPartitionKeyCol(), config.useIndex());
-        
-        switch (config.getType()) {
-            case SINGLE_KEY_CONTINUATION:
-            case TWO_KEY_SECOND_QUERY:
-                queryBuilder.append(partitionKeyName).append(" > ? ");
-                break;
-            case TWO_KEY_FIRST_QUERY:
-                String sortKeyName = CommonServiceUtils.getColumnExprFromPCol(
-                        config.getSortKeyCol(), config.useIndex());
-                queryBuilder.append("( ").append(partitionKeyName).append(" = 
? AND ")
-                           .append(sortKeyName).append(" > ? ) ");
-                break;
-        }
+
+        String rvcClause = DQLUtils.getRVCClauseForScan(" > ", 
config.getTablePKCols(),
+                config.useIndex(), config.getIndexPKCols());
+        queryBuilder.append(rvcClause);
         return true;
     }
 
@@ -304,13 +220,22 @@ public class ScanService {
     }
 
     private static void addOrderByClause(StringBuilder queryBuilder, 
ScanConfig config) {
-        String partitionKeyName = CommonServiceUtils.getColumnExprFromPCol(
-                config.getPartitionKeyCol(), config.useIndex());
-        queryBuilder.append(" ORDER BY ").append(partitionKeyName);
-        if (config.getSortKeyCol() != null) {
-            String sortKeyName = CommonServiceUtils.getColumnExprFromPCol(
-                    config.getSortKeyCol(), config.useIndex());
-            queryBuilder.append(", ").append(sortKeyName);
+        queryBuilder.append(" ORDER BY ");
+
+        if (config.useIndex()) {
+            for (int i = 0; i < config.getIndexPKCols().size(); i++) {
+                if (i > 0) queryBuilder.append(", ");
+                
queryBuilder.append(CommonServiceUtils.getColumnExprFromPCol(config.getIndexPKCols().get(i),
 true));
+            }
+            for (PColumn tablePkCol : config.getTablePKCols()) {
+                queryBuilder.append(", ");
+                
queryBuilder.append(CommonServiceUtils.getEscapedArgument(tablePkCol.getName().toString()));
+            }
+        } else {
+            for (int i = 0; i < config.getTablePKCols().size(); i++) {
+                if (i > 0) queryBuilder.append(", ");
+                
queryBuilder.append(CommonServiceUtils.getEscapedArgument(config.getTablePKCols().get(i).getName().toString()));
+            }
         }
     }
 
@@ -322,27 +247,20 @@ public class ScanService {
 
         int paramIndex = 1;
         if (config.getType() != ScanType.NO_EXCLUSIVE_START_KEY) {
-            // Set key condition parameters first
             Map<String, Object> exclusiveStartKey =
                     (Map<String, Object>) 
request.get(ApiMetadata.EXCLUSIVE_START_KEY);
-            String partitionKeyName = 
CommonServiceUtils.getColumnNameFromPCol(config.getPartitionKeyCol(), 
config.useIndex());
 
-            switch (config.getType()) {
-            case SINGLE_KEY_CONTINUATION:
-            case TWO_KEY_SECOND_QUERY:
-                DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
-                        (Map<String, Object>) 
exclusiveStartKey.get(partitionKeyName), false);
-                break;
-
-            case TWO_KEY_FIRST_QUERY:
-                String sortKeyName = 
CommonServiceUtils.getColumnNameFromPCol(config.getSortKeyCol(), 
config.useIndex());
-                // Set pk1 = ?
-                DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
-                        (Map<String, Object>) 
exclusiveStartKey.get(partitionKeyName), false);
-                // Set pk2 > ?
+            if (config.useIndex()) {
+                for (PColumn indexPkCol : config.getIndexPKCols()) {
+                    String keyName = 
CommonServiceUtils.getColumnNameFromPCol(indexPkCol, true);
+                    DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
+                            (Map<String, Object>) 
exclusiveStartKey.get(keyName), false);
+                }
+            }
+            for (PColumn tablePkCol : config.getTablePKCols()) {
+                String keyName = tablePkCol.getName().getString();
                 DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
-                        (Map<String, Object>) 
exclusiveStartKey.get(sortKeyName), false);
-                break;
+                        (Map<String, Object>) exclusiveStartKey.get(keyName), 
false);
             }
         }
 
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
index 5b70f33..de64a72 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
@@ -29,10 +29,10 @@ import java.util.Properties;
 public class DQLUtils {
 
     public static final String SIZE_LIMIT_REACHED = "SizeLimitReached";
-    private static final String ISK_HK_RVC = "(%s, %s) %s (?, ?)";
-    private static final String ISK_HK_SK_RVC = "(%s, %s, %s) %s (?, ?, ?)";
-    private static final String HK_SK_RVC = "(%s, %s) %s (?, ?)";
-    private static final String HK_RVC = "(%s) %s (?)";
+    private static final String RVC_1 = "(%s) %s (?)";
+    private static final String RVC_2 = "(%s, %s) %s (?, ?)";
+    private static final String RVC_3 = "(%s, %s, %s) %s (?, ?, ?)";
+    private static final String RVC_4 = "(%s, %s, %s, %s) %s (?, ?, ?, ?)";
 
     /**
      * Execute the given PreparedStatement, collect all returned items with 
projected attributes
@@ -41,12 +41,11 @@ public class DQLUtils {
     public static Map<String, Object> 
executeStatementReturnResult(PreparedStatement stmt,
             List<String> projectionAttributes, boolean useIndex,
             List<PColumn> tablePKCols, List<PColumn> indexPKCols, String 
tableName,
-            boolean isSingleRowExpected, boolean isScanFirstQuery, boolean 
countOnly) throws SQLException {
+            boolean isSingleRowExpected, boolean countOnly) throws 
SQLException {
         int count = 0;
         int bytesSize = 0;
         List<Map<String, Object>> items = new ArrayList<>();
         RawBsonDocument lastBsonDoc = null;
-        boolean sizeLimitReached = false;
         try (ResultSet rs = stmt.executeQuery()) {
             while (rs.next()) {
                 lastBsonDoc = (RawBsonDocument) rs.getObject(1);
@@ -57,7 +56,6 @@ public class DQLUtils {
                 bytesSize +=
                         (int) 
rs.unwrap(PhoenixResultSet.class).getCurrentRow().getSerializedSize();
                 if (bytesSize >= ApiMetadata.MAX_BYTES_SIZE) {
-                    sizeLimitReached = true;
                     break;
                 }
             }
@@ -73,9 +71,6 @@ public class DQLUtils {
             response.put(ApiMetadata.SCANNED_COUNT, countRowsScanned);
             response.put(ApiMetadata.CONSUMED_CAPACITY,
                     CommonServiceUtils.getConsumedCapacity(tableName));
-            if (isScanFirstQuery) {
-                response.put(SIZE_LIMIT_REACHED, sizeLimitReached);
-            }
             return response;
         } catch (SQLException e) {
             if (e.getMessage() != null && e.getMessage()
@@ -221,22 +216,56 @@ public class DQLUtils {
             // Index has only ihk
             if (tablePKCols.size() == 1) {
                 // (hk) > (?)
-                return String.format(HK_RVC, hk, op);
+                return String.format(RVC_1, hk, op);
             } else {
                 // (hk, sk) > (?, ?)
                 String sk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
-                return String.format(HK_SK_RVC, hk, sk, op);
+                return String.format(RVC_2, hk, sk, op);
             }
         } else {
             // Index has ihk, isk
             String isk = 
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(1), true);
             if (tablePKCols.size() == 1) {
                 // (isk, hk) > (?, ?)
-                return String.format(ISK_HK_RVC, isk, hk, op);
+                return String.format(RVC_2, isk, hk, op);
             } else {
                 // (isk, hk, sk) > (?, ?, ?)
                 String sk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
-                return String.format(ISK_HK_SK_RVC, isk, hk, sk, op);
+                return String.format(RVC_3, isk, hk, sk, op);
+            }
+        }
+    }
+
+    public static String getRVCClauseForScan(String op, List<PColumn> 
tablePKCols,
+                                              boolean useIndex, List<PColumn> 
indexPKCols) {
+        String hk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(0).getName().toString());
+        if (!useIndex) {
+            if (tablePKCols.size() == 1) {
+                return String.format(RVC_1, hk, op);
+            } else {
+                String sk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+                return String.format(RVC_2, hk, sk, op);
+            }
+        }
+        String ihk = 
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(0), true);
+        if (indexPKCols.size() == 1) {
+            if (tablePKCols.size() == 1) {
+                // (ihk, hk) > (?, ?)
+                return String.format(RVC_2, ihk, hk, op);
+            } else {
+                // (ihk, hk, sk) > (?, ?, ?)
+                String sk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+                return String.format(RVC_3, ihk, hk, sk, op);
+            }
+        } else {
+            String isk = 
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(1), true);
+            if (tablePKCols.size() == 1) {
+                // (ihk, isk, hk) > (?, ?, ?)
+                return String.format(RVC_3, ihk, isk, hk, op);
+            } else {
+                // (ihk, isk, hk, sk) > (?, ?, ?, ?)
+                String sk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+                return String.format(RVC_4, ihk, isk, hk, sk, op);
             }
         }
     }
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
index baa434d..d9c8656 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
@@ -31,18 +31,14 @@ public class ScanConfig {
      * Enumeration of different scan types
      */
     public enum ScanType {
-        NO_EXCLUSIVE_START_KEY,           // Simple scan without pagination
-        SINGLE_KEY_CONTINUATION,          // pk > value (single key table)
-        TWO_KEY_FIRST_QUERY,             // pk1 = value AND pk2 > value (two 
key table, first query)
-        TWO_KEY_SECOND_QUERY             // pk1 > value (two key table, second 
query)
+        NO_EXCLUSIVE_START_KEY,    // first page
+        WITH_EXCLUSIVE_START_KEY   // subsequent page — apply RVC
     }
 
     private final ScanType type;
     private final boolean useIndex;
     private final List<PColumn> tablePKCols;
     private final List<PColumn> indexPKCols;
-    private final PColumn partitionKeyCol;
-    private final PColumn sortKeyCol;
     private final int limit;
     private final String tableName;
     private final String indexName;
@@ -61,22 +57,6 @@ public class ScanConfig {
         this.tableName = tableName;
         this.indexName = indexName;
         this.countOnly = countOnly;
-
-        List<PColumn> relevantPKCols = useIndex ? indexPKCols : tablePKCols;
-        this.partitionKeyCol = relevantPKCols.get(0);
-        this.sortKeyCol = (relevantPKCols.size() > 1) ? relevantPKCols.get(1) 
: null;
-    }
-
-    /**
-     * Creates a new ScanConfig with a different type and limit, keeping other 
properties
-     */
-    public ScanConfig cloneWithTypeAndLimit(ScanType newType, int newLimit) {
-        ScanConfig newConfig = new ScanConfig(newType, this.useIndex, 
this.tablePKCols,
-                this.indexPKCols, newLimit, this.tableName, this.indexName, 
this.countOnly);
-        if (this.isSegmentScan) {
-            newConfig.setScanSegmentInfo(this.scanSegmentInfo);
-        }
-        return newConfig;
     }
 
     public void setScanSegmentInfo(ScanSegmentInfo segmentInfo) {
@@ -104,14 +84,6 @@ public class ScanConfig {
         return indexPKCols;
     }
 
-    public PColumn getPartitionKeyCol() {
-        return partitionKeyCol;
-    }
-
-    public PColumn getSortKeyCol() {
-        return sortKeyCol;
-    }
-
     public int getLimit() {
         return limit;
     }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
index 712b819..ffc853e 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -56,8 +57,7 @@ import org.apache.phoenix.util.ServerUtil;
 import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 
 /**
- * Parametrized test to verify getExclusiveStartKeyConditionForScan() logic in 
DQLUtils.
- * Tests scan pagination with different limits and different combinations of 
hash and sort key data types.
+ * Tests scan pagination on table with different limits and different 
combinations of hash and sort key data types.
  */
 @RunWith(Parameterized.class)
 public class ScanExclusiveStartKeyIT {
@@ -79,6 +79,7 @@ public class ScanExclusiveStartKeyIT {
     // Test parameters
     public int scanLimit;
     public KeyTypeConfig keyTypeConfig;
+    public boolean withFilter;
 
     // Configuration for different key type combinations
     public static class KeyTypeConfig {
@@ -102,7 +103,7 @@ public class ScanExclusiveStartKeyIT {
         }
     }
 
-    @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}")
+    @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}_filter_{2}")
     public static synchronized Collection<Object[]> data() {
         List<Object[]> parameters = new ArrayList<>();
         
@@ -120,19 +121,21 @@ public class ScanExclusiveStartKeyIT {
             new KeyTypeConfig("S_B", ScalarAttributeType.S, 
ScalarAttributeType.B)  // String + Binary
         };
         
-        // Create all combinations of limits and key types
         for (int limit : scanLimits) {
             for (KeyTypeConfig config : keyConfigs) {
-                parameters.add(new Object[]{limit, config});
+                for (boolean filter : new boolean[]{false, true}) {
+                    parameters.add(new Object[]{limit, config, filter});
+                }
             }
         }
         
         return parameters;
     }
 
-    public ScanExclusiveStartKeyIT(int scanLimit, KeyTypeConfig keyTypeConfig) 
{
+    public ScanExclusiveStartKeyIT(int scanLimit, KeyTypeConfig keyTypeConfig, 
boolean withFilter) {
         this.scanLimit = scanLimit;
         this.keyTypeConfig = keyTypeConfig;
+        this.withFilter = withFilter;
     }
 
     @BeforeClass
@@ -146,6 +149,7 @@ public class ScanExclusiveStartKeyIT {
         utility.startMiniCluster();
         String zkQuorum = "localhost:" + 
utility.getZkCluster().getClientPort();
         url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(new PhoenixTestDriver());
 
         restServer = new RESTServer(utility.getConfiguration());
         restServer.run();
@@ -182,22 +186,19 @@ public class ScanExclusiveStartKeyIT {
     @Test(timeout = 120000)
     public void testScanExclusiveStartKeyPagination() {
         final String tableName = 
testName.getMethodName().replaceAll("[\\[\\]]", "_") + 
-                "_limit_" + scanLimit + "_" + keyTypeConfig.name;
+                "_limit_" + scanLimit + "_" + keyTypeConfig.name + "_filter_" 
+ withFilter;
         
-        // Create table with hash key and sort key of specified types
+        final int totalItems = 56;
+
         CreateTableRequest createTableRequest =
                 DDLTestUtils.getCreateTableRequest(tableName, 
keyTypeConfig.hashKeyName,
                         keyTypeConfig.hashKeyType, keyTypeConfig.sortKeyName, 
keyTypeConfig.sortKeyType);
         phoenixDBClientV2.createTable(createTableRequest);
         dynamoDbClient.createTable(createTableRequest);
 
-        // Insert 56 items: 7 hash keys with 8 sort keys each
-        List<Map<String, AttributeValue>> testItems = new ArrayList<>();
         for (int hashIndex = 0; hashIndex < 7; hashIndex++) {
             for (int sortIndex = 0; sortIndex < 8; sortIndex++) {
                 Map<String, AttributeValue> item = createTestItem(hashIndex, 
sortIndex, keyTypeConfig);
-                testItems.add(item);
-                
                 PutItemRequest putItemRequest = PutItemRequest.builder()
                         .tableName(tableName)
                         .item(item)
@@ -207,7 +208,6 @@ public class ScanExclusiveStartKeyIT {
             }
         }
 
-        // Perform paginated scan with the specified limit
         List<Map<String, AttributeValue>> phoenixItems = new ArrayList<>();
         List<Map<String, AttributeValue>> dynamoItems = new ArrayList<>();
         
@@ -217,60 +217,51 @@ public class ScanExclusiveStartKeyIT {
         int phoenixPaginationCount = 0;
         int dynamoPaginationCount = 0;
 
-        // Phoenix scan with pagination
-        do {
-            ScanRequest.Builder phoenixScanRequest = ScanRequest.builder()
-                    .tableName(tableName)
-                    .limit(scanLimit);
-            
-            if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
-                phoenixScanRequest.exclusiveStartKey(phoenixLastKey);
-            }
-            
-            ScanResponse phoenixResult = 
phoenixDBClientV2.scan(phoenixScanRequest.build());
-            phoenixItems.addAll(phoenixResult.items());
-            phoenixLastKey = phoenixResult.lastEvaluatedKey();
-            phoenixPaginationCount++;
-            
-            LOGGER.info("Phoenix scan iteration {}, returned {} items, last 
key: {}", 
-                    phoenixPaginationCount, phoenixResult.count(), 
phoenixLastKey);
-                    
-        } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
-
-        // DynamoDB scan with pagination
         do {
             ScanRequest.Builder dynamoScanRequest = ScanRequest.builder()
                     .tableName(tableName)
                     .limit(scanLimit);
-            
+            applyFilter(dynamoScanRequest);
             if (dynamoLastKey != null && !dynamoLastKey.isEmpty()) {
                 dynamoScanRequest.exclusiveStartKey(dynamoLastKey);
             }
-            
             ScanResponse dynamoResult = 
dynamoDbClient.scan(dynamoScanRequest.build());
             dynamoItems.addAll(dynamoResult.items());
             dynamoLastKey = dynamoResult.lastEvaluatedKey();
             dynamoPaginationCount++;
-            
             LOGGER.info("DynamoDB scan iteration {}, returned {} items, last 
key: {}", 
                     dynamoPaginationCount, dynamoResult.count(), 
dynamoLastKey);
-                    
         } while (dynamoLastKey != null && !dynamoLastKey.isEmpty());
 
-        // Verify total number of items retrieved
-        Assert.assertEquals("Phoenix should return all 56 items", 56, 
phoenixItems.size());
-        Assert.assertEquals("DynamoDB should return all 56 items", 56, 
dynamoItems.size());
-        
-        // Verify that both clients returned the same number of pagination 
rounds
-        // Note: The exact pagination behavior might differ slightly, but both 
should complete
+        do {
+            ScanRequest.Builder phoenixScanRequest = ScanRequest.builder()
+                    .tableName(tableName)
+                    .limit(scanLimit);
+            applyFilter(phoenixScanRequest);
+            if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
+                phoenixScanRequest.exclusiveStartKey(phoenixLastKey);
+            }
+            ScanResponse phoenixResult = 
phoenixDBClientV2.scan(phoenixScanRequest.build());
+            phoenixItems.addAll(phoenixResult.items());
+            phoenixLastKey = phoenixResult.lastEvaluatedKey();
+            phoenixPaginationCount++;
+            LOGGER.info("Phoenix scan iteration {}, returned {} items, last 
key: {}",
+                    phoenixPaginationCount, phoenixResult.count(), 
phoenixLastKey);
+        } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
+
+        Assert.assertEquals("Phoenix and DynamoDB should return same number of 
items",
+                dynamoItems.size(), phoenixItems.size());
+        Assert.assertFalse("Should return some items", phoenixItems.isEmpty());
+        if (withFilter) {
+            Assert.assertTrue("Filter should reduce the result set",
+                    phoenixItems.size() < totalItems);
+        } else {
+            Assert.assertEquals("Should return all items without filter",
+                    totalItems, phoenixItems.size());
+        }
+
         Assert.assertTrue("Phoenix pagination should complete", 
phoenixPaginationCount > 0);
         Assert.assertTrue("DynamoDB pagination should complete", 
dynamoPaginationCount > 0);
-        
-        // For limits smaller than 8, we should see multiple pagination rounds
-        if (scanLimit < 8) {
-            Assert.assertTrue("Should require multiple pagination rounds for 
limit " + scanLimit, 
-                    phoenixPaginationCount > 7); // At least 7 hash keys, so 
should need multiple rounds
-        }
 
         List<Map<String, AttributeValue>> sortedPhoenixItems =
                 TestUtils.sortItemsByPartitionAndSortKey(phoenixItems, 
keyTypeConfig.hashKeyName, keyTypeConfig.sortKeyName);
@@ -283,19 +274,29 @@ public class ScanExclusiveStartKeyIT {
     /**
      * Create a test item with the given partition key and sort key based on 
the key type configuration.
      */
+    private void applyFilter(ScanRequest.Builder sr) {
+        if (withFilter) {
+            Map<String, String> names = new HashMap<>();
+            names.put("#nf", "num_field");
+            Map<String, AttributeValue> values = new HashMap<>();
+            values.put(":nv", AttributeValue.builder().n("30").build());
+            sr.filterExpression("#nf > :nv")
+                    .expressionAttributeNames(names)
+                    .expressionAttributeValues(values);
+        }
+    }
+
     private Map<String, AttributeValue> createTestItem(int hashIndex, int 
sortIndex, KeyTypeConfig config) {
         Map<String, AttributeValue> item = new HashMap<>();
         
-        // Create hash key value based on type
         AttributeValue hashKeyValue = createAttributeValue(config.hashKeyType, 
hashIndex, "pk");
         item.put(config.hashKeyName, hashKeyValue);
         
-        // Create sort key value based on type
         AttributeValue sortKeyValue = createAttributeValue(config.sortKeyType, 
sortIndex, "sk");
         item.put(config.sortKeyName, sortKeyValue);
         
-        // Add a data field for verification
         item.put("data_field", AttributeValue.builder().s("data_" + hashIndex 
+ "_" + sortIndex).build());
+        item.put("num_field", 
AttributeValue.builder().n(String.valueOf(hashIndex * 10 + sortIndex)).build());
         
         return item;
     }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java
index e22dfe3..131b8bb 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -83,6 +84,7 @@ public class ScanIndex2IT {
         utility.startMiniCluster();
         String zkQuorum = "localhost:" + 
utility.getZkCluster().getClientPort();
         url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(new PhoenixTestDriver());
 
         restServer = new RESTServer(utility.getConfiguration());
         restServer.run();
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
index 0dd7a78..6a9be8d 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.ddb.rest.RESTServer;
 import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
 import org.junit.AfterClass;
@@ -79,6 +80,7 @@ public class ScanIndex3IT {
         utility.startMiniCluster();
         String zkQuorum = "localhost:" + 
utility.getZkCluster().getClientPort();
         url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(new PhoenixTestDriver());
 
         restServer = new RESTServer(utility.getConfiguration());
         restServer.run();
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexExclusiveStartKeyIT.java
similarity index 52%
copy from 
phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
copy to 
phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexExclusiveStartKeyIT.java
index 712b819..26cf40d 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexExclusiveStartKeyIT.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -56,13 +57,20 @@ import org.apache.phoenix.util.ServerUtil;
 import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 
 /**
- * Parametrized test to verify getExclusiveStartKeyConditionForScan() logic in 
DQLUtils.
- * Tests scan pagination with different limits and different combinations of 
hash and sort key data types.
+ * Tests scan pagination on indexes with different limits and key type 
combinations.
+ * Items are inserted with deliberate ties on index keys to verify RVC cursor 
correctness
+ * when multiple rows share the same (ihk, isk) pair.
  */
 @RunWith(Parameterized.class)
-public class ScanExclusiveStartKeyIT {
+public class ScanIndexExclusiveStartKeyIT {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ScanExclusiveStartKeyIT.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ScanIndexExclusiveStartKeyIT.class);
+
+    private static final String INDEX_NAME = "test_gsi";
+    private static final String TABLE_HK = "partition_key";
+    private static final String TABLE_SK = "sort_key";
+    private static final String INDEX_HK = "idx_hk";
+    private static final String INDEX_SK = "idx_sk";
 
     private final DynamoDbClient dynamoDbClient =
             LocalDynamoDbTestBase.localDynamoDb().createV2Client();
@@ -75,25 +83,26 @@ public class ScanExclusiveStartKeyIT {
 
     @Rule
     public final TestName testName = new TestName();
-    
-    // Test parameters
+
     public int scanLimit;
     public KeyTypeConfig keyTypeConfig;
+    public boolean withFilter;
 
-    // Configuration for different key type combinations
     public static class KeyTypeConfig {
         public final String name;
-        public final ScalarAttributeType hashKeyType;
-        public final ScalarAttributeType sortKeyType;
-        public final String hashKeyName;
-        public final String sortKeyName;
+        public final ScalarAttributeType tableHKType;
+        public final ScalarAttributeType tableSKType;
+        public final ScalarAttributeType indexHKType;
+        public final ScalarAttributeType indexSKType;
 
-        public KeyTypeConfig(String name, ScalarAttributeType hashKeyType, 
ScalarAttributeType sortKeyType) {
+        public KeyTypeConfig(String name, ScalarAttributeType tableHKType,
+                ScalarAttributeType tableSKType, ScalarAttributeType 
indexHKType,
+                ScalarAttributeType indexSKType) {
             this.name = name;
-            this.hashKeyType = hashKeyType;
-            this.sortKeyType = sortKeyType;
-            this.hashKeyName = "partition_key";
-            this.sortKeyName = "sort_key";
+            this.tableHKType = tableHKType;
+            this.tableSKType = tableSKType;
+            this.indexHKType = indexHKType;
+            this.indexSKType = indexSKType;
         }
 
         @Override
@@ -102,37 +111,42 @@ public class ScanExclusiveStartKeyIT {
         }
     }
 
-    @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}")
+    @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}_filter_{2}")
     public static synchronized Collection<Object[]> data() {
         List<Object[]> parameters = new ArrayList<>();
-        
-        // Different scan limits to test
-        int[] scanLimits = {1, 2, 3, 4, 5, 6, 7, 8};
-        
-        // Different key type combinations
+
+        int[] scanLimits = {1, 2, 3, 5, 7, 10, 15};
+
         KeyTypeConfig[] keyConfigs = {
-            new KeyTypeConfig("S_N", ScalarAttributeType.S, 
ScalarAttributeType.N), // String + Number
-            new KeyTypeConfig("B_B", ScalarAttributeType.B, 
ScalarAttributeType.B), // Binary + Binary  
-            new KeyTypeConfig("B_N", ScalarAttributeType.B, 
ScalarAttributeType.N), // Binary + Number
-            new KeyTypeConfig("N_S", ScalarAttributeType.N, 
ScalarAttributeType.S), // Number + String
-            new KeyTypeConfig("N_N", ScalarAttributeType.N, 
ScalarAttributeType.N), // Number + Number
-            new KeyTypeConfig("S_S", ScalarAttributeType.S, 
ScalarAttributeType.S), // String + String
-            new KeyTypeConfig("S_B", ScalarAttributeType.S, 
ScalarAttributeType.B)  // String + Binary
+            new KeyTypeConfig("tSN_iSN", ScalarAttributeType.S, 
ScalarAttributeType.N,
+                    ScalarAttributeType.S, ScalarAttributeType.N),
+            new KeyTypeConfig("tSS_iNS", ScalarAttributeType.S, 
ScalarAttributeType.S,
+                    ScalarAttributeType.N, ScalarAttributeType.S),
+            new KeyTypeConfig("tNN_iSS", ScalarAttributeType.N, 
ScalarAttributeType.N,
+                    ScalarAttributeType.S, ScalarAttributeType.S),
+            new KeyTypeConfig("tSN_iNN", ScalarAttributeType.S, 
ScalarAttributeType.N,
+                    ScalarAttributeType.N, ScalarAttributeType.N),
+            new KeyTypeConfig("tBN_iSN", ScalarAttributeType.B, 
ScalarAttributeType.N,
+                    ScalarAttributeType.S, ScalarAttributeType.N),
+            new KeyTypeConfig("tNS_iSB", ScalarAttributeType.N, 
ScalarAttributeType.S,
+                    ScalarAttributeType.S, ScalarAttributeType.B),
         };
-        
-        // Create all combinations of limits and key types
+
         for (int limit : scanLimits) {
             for (KeyTypeConfig config : keyConfigs) {
-                parameters.add(new Object[]{limit, config});
+                for (boolean filter : new boolean[]{false, true}) {
+                    parameters.add(new Object[]{limit, config, filter});
+                }
             }
         }
-        
+
         return parameters;
     }
 
-    public ScanExclusiveStartKeyIT(int scanLimit, KeyTypeConfig keyTypeConfig) 
{
+    public ScanIndexExclusiveStartKeyIT(int scanLimit, KeyTypeConfig 
keyTypeConfig, boolean withFilter) {
         this.scanLimit = scanLimit;
         this.keyTypeConfig = keyTypeConfig;
+        this.withFilter = withFilter;
     }
 
     @BeforeClass
@@ -146,6 +160,7 @@ public class ScanExclusiveStartKeyIT {
         utility.startMiniCluster();
         String zkQuorum = "localhost:" + 
utility.getZkCluster().getClientPort();
         url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(new PhoenixTestDriver());
 
         restServer = new RESTServer(utility.getConfiguration());
         restServer.run();
@@ -173,31 +188,34 @@ public class ScanExclusiveStartKeyIT {
     }
 
     /**
-     * Test getExclusiveStartKeyConditionForScan() logic by creating a table 
with hash and sort keys,
-     * inserting 56 items (7 hash keys with 8 sort keys each), and scanning 
with different limits.
-     * This tests both cases:
-     * 1. When last evaluated key has only hash key
-     * 2. When last evaluated key has both hash key and sort key
+     * Test scan pagination on an index where multiple items share the same 
(ihk, isk) pair.
+     * 56 items (7 table hash keys × 8 sort keys), with index keys cycled to 
produce ties:
+     *   ihk = hashIndex % 3  → 3 distinct values
+     *   isk = sortIndex % 4  → 4 distinct values
+     * This forces the RVC cursor to rely on the full (ihk, isk, hk, sk) tuple 
for correctness.
      */
     @Test(timeout = 120000)
-    public void testScanExclusiveStartKeyPagination() {
-        final String tableName = 
testName.getMethodName().replaceAll("[\\[\\]]", "_") + 
-                "_limit_" + scanLimit + "_" + keyTypeConfig.name;
-        
-        // Create table with hash key and sort key of specified types
+    public void testScanIndexExclusiveStartKeyPagination() {
+        final String tableName = 
testName.getMethodName().replaceAll("[\\[\\]]", "_") +
+                "_limit_" + scanLimit + "_" + keyTypeConfig.name + "_filter_" 
+ withFilter;
+
         CreateTableRequest createTableRequest =
-                DDLTestUtils.getCreateTableRequest(tableName, 
keyTypeConfig.hashKeyName,
-                        keyTypeConfig.hashKeyType, keyTypeConfig.sortKeyName, 
keyTypeConfig.sortKeyType);
+                DDLTestUtils.getCreateTableRequest(tableName, TABLE_HK,
+                        keyTypeConfig.tableHKType, TABLE_SK, 
keyTypeConfig.tableSKType);
+        createTableRequest = DDLTestUtils.addIndexToRequest(true, 
createTableRequest,
+                INDEX_NAME, INDEX_HK, keyTypeConfig.indexHKType,
+                INDEX_SK, keyTypeConfig.indexSKType);
+
         phoenixDBClientV2.createTable(createTableRequest);
         dynamoDbClient.createTable(createTableRequest);
 
-        // Insert 56 items: 7 hash keys with 8 sort keys each
-        List<Map<String, AttributeValue>> testItems = new ArrayList<>();
-        for (int hashIndex = 0; hashIndex < 7; hashIndex++) {
-            for (int sortIndex = 0; sortIndex < 8; sortIndex++) {
-                Map<String, AttributeValue> item = createTestItem(hashIndex, 
sortIndex, keyTypeConfig);
-                testItems.add(item);
-                
+        final int numHashKeys = 7;
+        final int numSortKeys = 8;
+        final int totalItems = numHashKeys * numSortKeys;
+
+        for (int hashIndex = 0; hashIndex < numHashKeys; hashIndex++) {
+            for (int sortIndex = 0; sortIndex < numSortKeys; sortIndex++) {
+                Map<String, AttributeValue> item = createTestItem(hashIndex, 
sortIndex);
                 PutItemRequest putItemRequest = PutItemRequest.builder()
                         .tableName(tableName)
                         .item(item)
@@ -207,115 +225,120 @@ public class ScanExclusiveStartKeyIT {
             }
         }
 
-        // Perform paginated scan with the specified limit
         List<Map<String, AttributeValue>> phoenixItems = new ArrayList<>();
         List<Map<String, AttributeValue>> dynamoItems = new ArrayList<>();
-        
+
         Map<String, AttributeValue> phoenixLastKey = null;
         Map<String, AttributeValue> dynamoLastKey = null;
-        
-        int phoenixPaginationCount = 0;
-        int dynamoPaginationCount = 0;
 
-        // Phoenix scan with pagination
+        int phoenixPageCount = 0;
+        int dynamoPageCount = 0;
+
         do {
-            ScanRequest.Builder phoenixScanRequest = ScanRequest.builder()
+            ScanRequest.Builder sr = ScanRequest.builder()
                     .tableName(tableName)
+                    .indexName(INDEX_NAME)
                     .limit(scanLimit);
-            
-            if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
-                phoenixScanRequest.exclusiveStartKey(phoenixLastKey);
+            applyFilter(sr);
+            if (dynamoLastKey != null && !dynamoLastKey.isEmpty()) {
+                sr.exclusiveStartKey(dynamoLastKey);
             }
-            
-            ScanResponse phoenixResult = 
phoenixDBClientV2.scan(phoenixScanRequest.build());
-            phoenixItems.addAll(phoenixResult.items());
-            phoenixLastKey = phoenixResult.lastEvaluatedKey();
-            phoenixPaginationCount++;
-            
-            LOGGER.info("Phoenix scan iteration {}, returned {} items, last 
key: {}", 
-                    phoenixPaginationCount, phoenixResult.count(), 
phoenixLastKey);
-                    
-        } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
+            ScanResponse response = dynamoDbClient.scan(sr.build());
+            dynamoItems.addAll(response.items());
+            dynamoLastKey = response.lastEvaluatedKey();
+            dynamoPageCount++;
+            LOGGER.info("DynamoDB index scan page {}, returned {} items, 
lastKey: {}",
+                    dynamoPageCount, response.count(), dynamoLastKey);
+        } while (dynamoLastKey != null && !dynamoLastKey.isEmpty());
 
-        // DynamoDB scan with pagination
         do {
-            ScanRequest.Builder dynamoScanRequest = ScanRequest.builder()
+            ScanRequest.Builder sr = ScanRequest.builder()
                     .tableName(tableName)
+                    .indexName(INDEX_NAME)
                     .limit(scanLimit);
-            
-            if (dynamoLastKey != null && !dynamoLastKey.isEmpty()) {
-                dynamoScanRequest.exclusiveStartKey(dynamoLastKey);
+            applyFilter(sr);
+            if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
+                sr.exclusiveStartKey(phoenixLastKey);
             }
-            
-            ScanResponse dynamoResult = 
dynamoDbClient.scan(dynamoScanRequest.build());
-            dynamoItems.addAll(dynamoResult.items());
-            dynamoLastKey = dynamoResult.lastEvaluatedKey();
-            dynamoPaginationCount++;
-            
-            LOGGER.info("DynamoDB scan iteration {}, returned {} items, last 
key: {}", 
-                    dynamoPaginationCount, dynamoResult.count(), 
dynamoLastKey);
-                    
-        } while (dynamoLastKey != null && !dynamoLastKey.isEmpty());
+            ScanResponse response = phoenixDBClientV2.scan(sr.build());
+            phoenixItems.addAll(response.items());
+            phoenixLastKey = response.lastEvaluatedKey();
+            phoenixPageCount++;
+            LOGGER.info("Phoenix index scan page {}, returned {} items, 
lastKey: {}",
+                    phoenixPageCount, response.count(), phoenixLastKey);
+        } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
+
+        Assert.assertEquals("Phoenix and DynamoDB should return same number of 
items",
+                dynamoItems.size(), phoenixItems.size());
+        Assert.assertFalse("Should return some items", phoenixItems.isEmpty());
+        if (withFilter) {
+            Assert.assertTrue("Filter should reduce the result set",
+                    phoenixItems.size() < totalItems);
+        } else {
+            Assert.assertEquals("Should return all items without filter",
+                    totalItems, phoenixItems.size());
+        }
+
+        Assert.assertTrue("Phoenix pagination should complete", 
phoenixPageCount > 0);
+        Assert.assertTrue("DynamoDB pagination should complete", 
dynamoPageCount > 0);
 
-        // Verify total number of items retrieved
-        Assert.assertEquals("Phoenix should return all 56 items", 56, 
phoenixItems.size());
-        Assert.assertEquals("DynamoDB should return all 56 items", 56, 
dynamoItems.size());
-        
-        // Verify that both clients returned the same number of pagination 
rounds
-        // Note: The exact pagination behavior might differ slightly, but both 
should complete
-        Assert.assertTrue("Phoenix pagination should complete", 
phoenixPaginationCount > 0);
-        Assert.assertTrue("DynamoDB pagination should complete", 
dynamoPaginationCount > 0);
-        
-        // For limits smaller than 8, we should see multiple pagination rounds
-        if (scanLimit < 8) {
-            Assert.assertTrue("Should require multiple pagination rounds for 
limit " + scanLimit, 
-                    phoenixPaginationCount > 7); // At least 7 hash keys, so 
should need multiple rounds
+        if (scanLimit < totalItems) {
+            Assert.assertTrue("Should require multiple pagination rounds for 
limit " + scanLimit,
+                    phoenixPageCount > 1);
         }
 
         List<Map<String, AttributeValue>> sortedPhoenixItems =
-                TestUtils.sortItemsByPartitionAndSortKey(phoenixItems, 
keyTypeConfig.hashKeyName, keyTypeConfig.sortKeyName);
+                TestUtils.sortItemsByPartitionAndSortKey(phoenixItems, 
TABLE_HK, TABLE_SK);
         List<Map<String, AttributeValue>> sortedDynamoItems =
-                TestUtils.sortItemsByPartitionAndSortKey(dynamoItems, 
keyTypeConfig.hashKeyName, keyTypeConfig.sortKeyName);
+                TestUtils.sortItemsByPartitionAndSortKey(dynamoItems, 
TABLE_HK, TABLE_SK);
         Assert.assertTrue("Phoenix and DynamoDB should return identical items 
when sorted",
                 ItemComparator.areItemsEqual(sortedPhoenixItems, 
sortedDynamoItems));
     }
 
-    /**
-     * Create a test item with the given partition key and sort key based on 
the key type configuration.
-     */
-    private Map<String, AttributeValue> createTestItem(int hashIndex, int 
sortIndex, KeyTypeConfig config) {
+    private void applyFilter(ScanRequest.Builder sr) {
+        if (withFilter) {
+            Map<String, String> names = new HashMap<>();
+            names.put("#sc", "score");
+            Map<String, AttributeValue> values = new HashMap<>();
+            values.put(":sv", AttributeValue.builder().n("300").build());
+            sr.filterExpression("#sc > :sv")
+                    .expressionAttributeNames(names)
+                    .expressionAttributeValues(values);
+        }
+    }
+
+    private Map<String, AttributeValue> createTestItem(int hashIndex, int 
sortIndex) {
         Map<String, AttributeValue> item = new HashMap<>();
-        
-        // Create hash key value based on type
-        AttributeValue hashKeyValue = createAttributeValue(config.hashKeyType, 
hashIndex, "pk");
-        item.put(config.hashKeyName, hashKeyValue);
-        
-        // Create sort key value based on type
-        AttributeValue sortKeyValue = createAttributeValue(config.sortKeyType, 
sortIndex, "sk");
-        item.put(config.sortKeyName, sortKeyValue);
-        
-        // Add a data field for verification
-        item.put("data_field", AttributeValue.builder().s("data_" + hashIndex 
+ "_" + sortIndex).build());
-        
+
+        item.put(TABLE_HK, createAttributeValue(keyTypeConfig.tableHKType, 
hashIndex, "pk"));
+        item.put(TABLE_SK, createAttributeValue(keyTypeConfig.tableSKType, 
sortIndex, "sk"));
+
+        // Cycle index keys to create ties: 3 distinct ihk values, 4 distinct 
isk values
+        int ihkVal = hashIndex % 3;
+        int iskVal = sortIndex % 4;
+        item.put(INDEX_HK, createAttributeValue(keyTypeConfig.indexHKType, 
ihkVal, "ihk"));
+        item.put(INDEX_SK, createAttributeValue(keyTypeConfig.indexSKType, 
iskVal, "isk"));
+
+        item.put("extra_data", AttributeValue.builder()
+                .s("data_" + hashIndex + "_" + sortIndex).build());
+        item.put("score", AttributeValue.builder()
+                .n(String.valueOf(hashIndex * 100 + sortIndex)).build());
+
         return item;
     }
 
-    /**
-     * Create an AttributeValue based on the specified type and index.
-     */
-    private AttributeValue createAttributeValue(ScalarAttributeType type, int 
index, String prefix) {
+    private AttributeValue createAttributeValue(ScalarAttributeType type, int 
index,
+            String prefix) {
         switch (type) {
             case S:
                 return AttributeValue.builder().s(prefix + index).build();
             case N:
                 return 
AttributeValue.builder().n(String.valueOf(index)).build();
             case B:
-                // Create binary data using the index as bytes
                 byte[] bytes = (prefix + index).getBytes();
                 return 
AttributeValue.builder().b(SdkBytes.fromByteArray(bytes)).build();
             default:
                 throw new IllegalArgumentException("Unsupported attribute 
type: " + type);
         }
     }
-
-} 
\ No newline at end of file
+}
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
index d3833ca..be9edbc 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -78,6 +79,7 @@ public class ScanIndexIT {
         utility.startMiniCluster();
         String zkQuorum = "localhost:" + 
utility.getZkCluster().getClientPort();
         url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(new PhoenixTestDriver());
 
         restServer = new RESTServer(utility.getConfiguration());
         restServer.run();
@@ -478,6 +480,102 @@ public class ScanIndexIT {
         TestUtils.verifyItemsEqual(ddbItems, phoenixResult0.items(), "title", 
null);
     }
 
+    /**
+     * RVC_1: Paginated scan on a table with hash key only (no sort key, no 
index).
+     */
+    @Test(timeout = 120000)
+    public void testScanPaginationTableHKOnly() {
+        final String tableName = testName.getMethodName();
+        CreateTableRequest createTableRequest =
+                DDLTestUtils.getCreateTableRequest(tableName, "pk",
+                        ScalarAttributeType.S, null, null);
+        phoenixDBClientV2.createTable(createTableRequest);
+        dynamoDbClient.createTable(createTableRequest);
+
+        for (int i = 0; i < 6; i++) {
+            Map<String, AttributeValue> item = new HashMap<>();
+            item.put("pk", AttributeValue.builder().s("pk" + i).build());
+            item.put("data", AttributeValue.builder().s("val" + i).build());
+            PutItemRequest req = 
PutItemRequest.builder().tableName(tableName).item(item).build();
+            phoenixDBClientV2.putItem(req);
+            dynamoDbClient.putItem(req);
+        }
+
+        ScanRequest.Builder sr = 
ScanRequest.builder().tableName(tableName).limit(2);
+        TestUtils.compareScanOutputs(sr, phoenixDBClientV2, dynamoDbClient,
+                "pk", null, ScalarAttributeType.S, null);
+    }
+
+    /**
+     * RVC_2: Paginated scan on an ihk-only index over a hk-only table.
+     */
+    @Test(timeout = 120000)
+    public void testScanPaginationIHKOnlyIndex_TableHKOnly() throws 
SQLException {
+        final String tableName = testName.getMethodName();
+        final String indexName = "idx_" + tableName;
+        CreateTableRequest createTableRequest =
+                DDLTestUtils.getCreateTableRequest(tableName, "pk",
+                        ScalarAttributeType.S, null, null);
+        createTableRequest = DDLTestUtils.addIndexToRequest(true, 
createTableRequest,
+                indexName, "status", ScalarAttributeType.S, null, null);
+        phoenixDBClientV2.createTable(createTableRequest);
+        dynamoDbClient.createTable(createTableRequest);
+
+        String[] statuses = {"active", "inactive", "active", "pending", 
"inactive", "active"};
+        for (int i = 0; i < 6; i++) {
+            Map<String, AttributeValue> item = new HashMap<>();
+            item.put("pk", AttributeValue.builder().s("pk" + i).build());
+            item.put("status", 
AttributeValue.builder().s(statuses[i]).build());
+            item.put("data", AttributeValue.builder().s("val" + i).build());
+            PutItemRequest req = 
PutItemRequest.builder().tableName(tableName).item(item).build();
+            phoenixDBClientV2.putItem(req);
+            dynamoDbClient.putItem(req);
+        }
+
+        ScanRequest.Builder sr = ScanRequest.builder()
+                .tableName(tableName).indexName(indexName).limit(2);
+        TestUtils.compareScanOutputs(sr, phoenixDBClientV2, dynamoDbClient,
+                "pk", null, ScalarAttributeType.S, null);
+        sr.exclusiveStartKey(null);
+        TestUtils.validateIndexUsed(sr.build(), url, "FULL SCAN ");
+    }
+
+    /**
+     * RVC_3: Paginated scan on an ihk+isk index over a hk-only table.
+     */
+    @Test(timeout = 120000)
+    public void testScanPaginationIHKISKIndex_TableHKOnly() throws 
SQLException {
+        final String tableName = testName.getMethodName();
+        final String indexName = "idx_" + tableName;
+        CreateTableRequest createTableRequest =
+                DDLTestUtils.getCreateTableRequest(tableName, "pk",
+                        ScalarAttributeType.S, null, null);
+        createTableRequest = DDLTestUtils.addIndexToRequest(true, 
createTableRequest,
+                indexName, "status", ScalarAttributeType.S,
+                "timestamp", ScalarAttributeType.N);
+        phoenixDBClientV2.createTable(createTableRequest);
+        dynamoDbClient.createTable(createTableRequest);
+
+        String[] statuses = {"active", "inactive", "active", "pending", 
"inactive", "active"};
+        for (int i = 0; i < 6; i++) {
+            Map<String, AttributeValue> item = new HashMap<>();
+            item.put("pk", AttributeValue.builder().s("pk" + i).build());
+            item.put("status", 
AttributeValue.builder().s(statuses[i]).build());
+            item.put("timestamp", 
AttributeValue.builder().n(String.valueOf(1000 + (i % 3))).build());
+            item.put("data", AttributeValue.builder().s("val" + i).build());
+            PutItemRequest req = 
PutItemRequest.builder().tableName(tableName).item(item).build();
+            phoenixDBClientV2.putItem(req);
+            dynamoDbClient.putItem(req);
+        }
+
+        ScanRequest.Builder sr = ScanRequest.builder()
+                .tableName(tableName).indexName(indexName).limit(2);
+        TestUtils.compareScanOutputs(sr, phoenixDBClientV2, dynamoDbClient,
+                "pk", null, ScalarAttributeType.S, null);
+        sr.exclusiveStartKey(null);
+        TestUtils.validateIndexUsed(sr.build(), url, "FULL SCAN ");
+    }
+
     private static Map<String, AttributeValue> getItem1() {
         Map<String, AttributeValue> item = new HashMap<>();
         item.put("DataGrave-_-Obscure_Dream_.404", 
AttributeValue.builder().s("A").build());
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
index b884759..432556b 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
@@ -178,17 +178,10 @@ public class TestUtils {
         int effectiveLimit = 100;
         boolean countOnly = 
ApiMetadata.SELECT_COUNT.equals(request.get(ApiMetadata.SELECT));
 
-        ScanConfig config = new ScanConfig(
-                ScanService.determineScanType(exclusiveStartKey, useIndex, 
tablePKCols, indexPKCols),
+        ScanConfig config = new 
ScanConfig(ScanService.determineScanType(exclusiveStartKey),
                 useIndex, tablePKCols, indexPKCols, effectiveLimit, tableName, 
indexName, countOnly
         );
-
-        // For two-query scenarios, return the first query's PreparedStatement
-        if (config.getType() == ScanConfig.ScanType.TWO_KEY_FIRST_QUERY) {
-            return ScanService.buildQuery(connection, request, config);
-        } else {
-            return ScanService.buildQuery(connection, request, config);
-        }
+        return ScanService.buildQuery(connection, request, config);
     }
 
     /**

Reply via email to