This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 6804822 Phoenix RVC for Scan Pagination
6804822 is described below
commit 68048222e50f15447b4b06c96c51797598c19d77
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Mar 5 11:57:31 2026 -0800
Phoenix RVC for Scan Pagination
---
.../apache/phoenix/ddb/service/QueryService.java | 2 +-
.../apache/phoenix/ddb/service/ScanService.java | 168 ++++---------
.../apache/phoenix/ddb/service/utils/DQLUtils.java | 57 +++--
.../phoenix/ddb/service/utils/ScanConfig.java | 32 +--
.../phoenix/ddb/ScanExclusiveStartKeyIT.java | 107 ++++----
.../java/org/apache/phoenix/ddb/ScanIndex2IT.java | 2 +
.../java/org/apache/phoenix/ddb/ScanIndex3IT.java | 2 +
...eyIT.java => ScanIndexExclusiveStartKeyIT.java} | 275 +++++++++++----------
.../java/org/apache/phoenix/ddb/ScanIndexIT.java | 98 ++++++++
.../java/org/apache/phoenix/ddb/TestUtils.java | 11 +-
10 files changed, 396 insertions(+), 358 deletions(-)
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
index fe16acc..c11a695 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
@@ -77,7 +77,7 @@ public class QueryService {
boolean countOnly =
ApiMetadata.SELECT_COUNT.equals(request.get(ApiMetadata.SELECT));
return DQLUtils.executeStatementReturnResult(stmt,
getProjectionAttributes(request), useIndex, tablePKCols,
indexPKCols, tableName,
- isSingleRowExpected, false, countOnly);
+ isSingleRowExpected, countOnly);
} catch (SQLException e) {
throw new PhoenixServiceException(e);
}
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
index 9a086c5..63fc4f0 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
@@ -24,7 +24,6 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.ArrayList;
import java.util.HashMap;
import org.apache.phoenix.ddb.ConnectionUtil;
@@ -90,8 +89,7 @@ public class ScanService {
int effectiveLimit = getEffectiveLimit(request);
boolean countOnly =
ApiMetadata.SELECT_COUNT.equals(request.get(ApiMetadata.SELECT));
- ScanConfig config = new ScanConfig(
- determineScanType(exclusiveStartKey, useIndex, tablePKCols,
indexPKCols),
+ ScanConfig config = new
ScanConfig(determineScanType(exclusiveStartKey),
useIndex, tablePKCols, indexPKCols, effectiveLimit, tableName,
indexName, countOnly
);
@@ -105,33 +103,20 @@ public class ScanService {
config.setScanSegmentInfo(segmentInfo);
}
- // Execute based on scan type (same logic for both regular and segment
scans)
- switch (config.getType()) {
- case NO_EXCLUSIVE_START_KEY:
- case SINGLE_KEY_CONTINUATION:
- return executeSingleQuery(connection, request, config);
- case TWO_KEY_FIRST_QUERY:
- return executeTwoKeyTableScan(connection, request, config);
- default:
- throw new IllegalStateException("Unsupported scan config type:
" + config.getType());
- }
+ PreparedStatement stmt = buildQuery(connection, request, config);
+ return DQLUtils.executeStatementReturnResult(stmt,
getProjectionAttributes(request),
+ config.useIndex(), config.getTablePKCols(),
config.getIndexPKCols(), config.getTableName(),
+ false, config.isCountOnly());
}
/**
* Determine the appropriate scan type based on request parameters
*/
- public static ScanType determineScanType(Map<String, Object>
exclusiveStartKey,
- boolean useIndex, List<PColumn> tablePKCols, List<PColumn>
indexPKCols) {
+ public static ScanType determineScanType(Map<String, Object>
exclusiveStartKey) {
if (exclusiveStartKey == null || exclusiveStartKey.isEmpty()) {
return ScanType.NO_EXCLUSIVE_START_KEY;
}
-
- List<PColumn> relevantPKCols = useIndex ? indexPKCols : tablePKCols;
- if (relevantPKCols.size() == 1) {
- return ScanType.SINGLE_KEY_CONTINUATION;
- } else {
- return ScanType.TWO_KEY_FIRST_QUERY;
- }
+ return ScanType.WITH_EXCLUSIVE_START_KEY;
}
/**
@@ -142,63 +127,6 @@ public class ScanService {
return (requestLimit == null) ? MAX_SCAN_LIMIT :
Math.min(requestLimit, MAX_SCAN_LIMIT);
}
- /**
- * Execute a single query scan (for no pagination, single key, or original
logic)
- */
- private static Map<String, Object> executeSingleQuery(Connection
connection, Map<String, Object> request,
- ScanConfig config)
throws SQLException {
- PreparedStatement stmt = buildQuery(connection, request, config);
- return DQLUtils.executeStatementReturnResult(stmt,
getProjectionAttributes(request),
- config.useIndex(), config.getTablePKCols(),
config.getIndexPKCols(), config.getTableName(),
- false, false, config.isCountOnly());
- }
-
- /**
- * Execute two-key table scan using the two-query approach
- */
- private static Map<String, Object> executeTwoKeyTableScan(Connection
connection, Map<String, Object> request,
- ScanConfig config)
throws SQLException {
-
- // Execute first query: (pk1 = k1 AND pk2 > k2)
-
- PreparedStatement firstStmt = buildQuery(connection, request, config);
- Map<String, Object> firstResult =
DQLUtils.executeStatementReturnResult(firstStmt,
- getProjectionAttributes(request), config.useIndex(),
config.getTablePKCols(), config.getIndexPKCols(),
- config.getTableName(), false, true, config.isCountOnly());
-
- List<Map<String, Object>> allItems = config.isCountOnly()
- ? new ArrayList<>()
- : new ArrayList<>((List<Map<String, Object>>)
firstResult.get(ApiMetadata.ITEMS));
- int totalCount = (Integer) firstResult.get(ApiMetadata.COUNT);
- int totalScannedCount = (Integer)
firstResult.get(ApiMetadata.SCANNED_COUNT);
- Map<String, Object> lastEvaluatedKey = (Map<String, Object>)
firstResult.get(ApiMetadata.LAST_EVALUATED_KEY);
-
- // Execute second query if needed: (pk1 > k1)
- if (totalCount < config.getLimit() &&
!(boolean)firstResult.get(DQLUtils.SIZE_LIMIT_REACHED)) {
- int remainingLimit = config.getLimit() - totalCount;
- ScanConfig secondConfig =
config.cloneWithTypeAndLimit(ScanType.TWO_KEY_SECOND_QUERY, remainingLimit);
- PreparedStatement secondStmt = buildQuery(connection, request,
secondConfig);
- Map<String, Object> secondResult =
DQLUtils.executeStatementReturnResult(secondStmt,
- getProjectionAttributes(request), config.useIndex(),
config.getTablePKCols(), config.getIndexPKCols(),
- config.getTableName(), false, false, config.isCountOnly());
-
- if (!config.isCountOnly()) {
- List<Map<String, Object>> secondItems = (List<Map<String,
Object>>) secondResult.get(ApiMetadata.ITEMS);
- allItems.addAll(secondItems);
- }
- totalCount += (Integer) secondResult.get(ApiMetadata.COUNT);
- totalScannedCount += (Integer)
secondResult.get(ApiMetadata.SCANNED_COUNT);
-
- // Use LastEvaluatedKey from second query if it returned items,
otherwise from first query
- Map<String, Object> secondLastKey = (Map<String, Object>)
secondResult.get(ApiMetadata.LAST_EVALUATED_KEY);
- if (secondLastKey != null) {
- lastEvaluatedKey = secondLastKey;
- }
- }
-
- return buildScanResponse(allItems, totalCount, totalScannedCount,
config.getTableName(), lastEvaluatedKey, config.isCountOnly());
- }
-
/**
* Unified query builder that handles all scan types
*/
@@ -210,8 +138,8 @@ public class ScanService {
// Add filter conditions
boolean hasFilterCondition = addFilterConditionIfPresent(queryBuilder,
request);
- // Add key conditions
- boolean hasKeyConditions = addKeyConditions(queryBuilder, config,
hasFilterCondition);
+ // Add key conditions (RVC)
+ boolean hasKeyConditions = addRVC(queryBuilder, config,
hasFilterCondition);
// Add segment boundary conditions
addSegmentBoundaryConditions(queryBuilder, config, hasFilterCondition
|| hasKeyConditions);
@@ -261,12 +189,12 @@ public class ScanService {
}
/**
- * Add key-based WHERE conditions based on scan type
+ * Add RVC clause if request has lastEvaluatedKey.
*
- * @return true if key conditions were added
+ * @return true if RVC clause was added
*/
- private static boolean addKeyConditions(StringBuilder queryBuilder,
ScanConfig config,
- boolean hasPreviousConditions) {
+ private static boolean addRVC(StringBuilder queryBuilder, ScanConfig
config,
+ boolean hasPreviousConditions) {
if (config.getType() == ScanType.NO_EXCLUSIVE_START_KEY) {
// No key conditions needed for simple scan
return false;
@@ -277,22 +205,10 @@ public class ScanService {
} else {
queryBuilder.append(" WHERE ");
}
-
- String partitionKeyName = CommonServiceUtils.getColumnExprFromPCol(
- config.getPartitionKeyCol(), config.useIndex());
-
- switch (config.getType()) {
- case SINGLE_KEY_CONTINUATION:
- case TWO_KEY_SECOND_QUERY:
- queryBuilder.append(partitionKeyName).append(" > ? ");
- break;
- case TWO_KEY_FIRST_QUERY:
- String sortKeyName = CommonServiceUtils.getColumnExprFromPCol(
- config.getSortKeyCol(), config.useIndex());
- queryBuilder.append("( ").append(partitionKeyName).append(" =
? AND ")
- .append(sortKeyName).append(" > ? ) ");
- break;
- }
+
+ String rvcClause = DQLUtils.getRVCClauseForScan(" > ",
config.getTablePKCols(),
+ config.useIndex(), config.getIndexPKCols());
+ queryBuilder.append(rvcClause);
return true;
}
@@ -304,13 +220,22 @@ public class ScanService {
}
private static void addOrderByClause(StringBuilder queryBuilder,
ScanConfig config) {
- String partitionKeyName = CommonServiceUtils.getColumnExprFromPCol(
- config.getPartitionKeyCol(), config.useIndex());
- queryBuilder.append(" ORDER BY ").append(partitionKeyName);
- if (config.getSortKeyCol() != null) {
- String sortKeyName = CommonServiceUtils.getColumnExprFromPCol(
- config.getSortKeyCol(), config.useIndex());
- queryBuilder.append(", ").append(sortKeyName);
+ queryBuilder.append(" ORDER BY ");
+
+ if (config.useIndex()) {
+ for (int i = 0; i < config.getIndexPKCols().size(); i++) {
+ if (i > 0) queryBuilder.append(", ");
+
queryBuilder.append(CommonServiceUtils.getColumnExprFromPCol(config.getIndexPKCols().get(i),
true));
+ }
+ for (PColumn tablePkCol : config.getTablePKCols()) {
+ queryBuilder.append(", ");
+
queryBuilder.append(CommonServiceUtils.getEscapedArgument(tablePkCol.getName().toString()));
+ }
+ } else {
+ for (int i = 0; i < config.getTablePKCols().size(); i++) {
+ if (i > 0) queryBuilder.append(", ");
+
queryBuilder.append(CommonServiceUtils.getEscapedArgument(config.getTablePKCols().get(i).getName().toString()));
+ }
}
}
@@ -322,27 +247,20 @@ public class ScanService {
int paramIndex = 1;
if (config.getType() != ScanType.NO_EXCLUSIVE_START_KEY) {
- // Set key condition parameters first
Map<String, Object> exclusiveStartKey =
(Map<String, Object>)
request.get(ApiMetadata.EXCLUSIVE_START_KEY);
- String partitionKeyName =
CommonServiceUtils.getColumnNameFromPCol(config.getPartitionKeyCol(),
config.useIndex());
- switch (config.getType()) {
- case SINGLE_KEY_CONTINUATION:
- case TWO_KEY_SECOND_QUERY:
- DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
- (Map<String, Object>)
exclusiveStartKey.get(partitionKeyName), false);
- break;
-
- case TWO_KEY_FIRST_QUERY:
- String sortKeyName =
CommonServiceUtils.getColumnNameFromPCol(config.getSortKeyCol(),
config.useIndex());
- // Set pk1 = ?
- DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
- (Map<String, Object>)
exclusiveStartKey.get(partitionKeyName), false);
- // Set pk2 > ?
+ if (config.useIndex()) {
+ for (PColumn indexPkCol : config.getIndexPKCols()) {
+ String keyName =
CommonServiceUtils.getColumnNameFromPCol(indexPkCol, true);
+ DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
+ (Map<String, Object>)
exclusiveStartKey.get(keyName), false);
+ }
+ }
+ for (PColumn tablePkCol : config.getTablePKCols()) {
+ String keyName = tablePkCol.getName().getString();
DQLUtils.setKeyValueOnStatement(stmt, paramIndex++,
- (Map<String, Object>)
exclusiveStartKey.get(sortKeyName), false);
- break;
+ (Map<String, Object>) exclusiveStartKey.get(keyName),
false);
}
}
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
index 5b70f33..de64a72 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
@@ -29,10 +29,10 @@ import java.util.Properties;
public class DQLUtils {
public static final String SIZE_LIMIT_REACHED = "SizeLimitReached";
- private static final String ISK_HK_RVC = "(%s, %s) %s (?, ?)";
- private static final String ISK_HK_SK_RVC = "(%s, %s, %s) %s (?, ?, ?)";
- private static final String HK_SK_RVC = "(%s, %s) %s (?, ?)";
- private static final String HK_RVC = "(%s) %s (?)";
+ private static final String RVC_1 = "(%s) %s (?)";
+ private static final String RVC_2 = "(%s, %s) %s (?, ?)";
+ private static final String RVC_3 = "(%s, %s, %s) %s (?, ?, ?)";
+ private static final String RVC_4 = "(%s, %s, %s, %s) %s (?, ?, ?, ?)";
/**
* Execute the given PreparedStatement, collect all returned items with
projected attributes
@@ -41,12 +41,11 @@ public class DQLUtils {
public static Map<String, Object>
executeStatementReturnResult(PreparedStatement stmt,
List<String> projectionAttributes, boolean useIndex,
List<PColumn> tablePKCols, List<PColumn> indexPKCols, String
tableName,
- boolean isSingleRowExpected, boolean isScanFirstQuery, boolean
countOnly) throws SQLException {
+ boolean isSingleRowExpected, boolean countOnly) throws
SQLException {
int count = 0;
int bytesSize = 0;
List<Map<String, Object>> items = new ArrayList<>();
RawBsonDocument lastBsonDoc = null;
- boolean sizeLimitReached = false;
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
lastBsonDoc = (RawBsonDocument) rs.getObject(1);
@@ -57,7 +56,6 @@ public class DQLUtils {
bytesSize +=
(int)
rs.unwrap(PhoenixResultSet.class).getCurrentRow().getSerializedSize();
if (bytesSize >= ApiMetadata.MAX_BYTES_SIZE) {
- sizeLimitReached = true;
break;
}
}
@@ -73,9 +71,6 @@ public class DQLUtils {
response.put(ApiMetadata.SCANNED_COUNT, countRowsScanned);
response.put(ApiMetadata.CONSUMED_CAPACITY,
CommonServiceUtils.getConsumedCapacity(tableName));
- if (isScanFirstQuery) {
- response.put(SIZE_LIMIT_REACHED, sizeLimitReached);
- }
return response;
} catch (SQLException e) {
if (e.getMessage() != null && e.getMessage()
@@ -221,22 +216,56 @@ public class DQLUtils {
// Index has only ihk
if (tablePKCols.size() == 1) {
// (hk) > (?)
- return String.format(HK_RVC, hk, op);
+ return String.format(RVC_1, hk, op);
} else {
// (hk, sk) > (?, ?)
String sk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
- return String.format(HK_SK_RVC, hk, sk, op);
+ return String.format(RVC_2, hk, sk, op);
}
} else {
// Index has ihk, isk
String isk =
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(1), true);
if (tablePKCols.size() == 1) {
// (isk, hk) > (?, ?)
- return String.format(ISK_HK_RVC, isk, hk, op);
+ return String.format(RVC_2, isk, hk, op);
} else {
// (isk, hk, sk) > (?, ?, ?)
String sk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
- return String.format(ISK_HK_SK_RVC, isk, hk, sk, op);
+ return String.format(RVC_3, isk, hk, sk, op);
+ }
+ }
+ }
+
+ public static String getRVCClauseForScan(String op, List<PColumn>
tablePKCols,
+ boolean useIndex, List<PColumn>
indexPKCols) {
+ String hk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(0).getName().toString());
+ if (!useIndex) {
+ if (tablePKCols.size() == 1) {
+ return String.format(RVC_1, hk, op);
+ } else {
+ String sk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+ return String.format(RVC_2, hk, sk, op);
+ }
+ }
+ String ihk =
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(0), true);
+ if (indexPKCols.size() == 1) {
+ if (tablePKCols.size() == 1) {
+ // (ihk, hk) > (?, ?)
+ return String.format(RVC_2, ihk, hk, op);
+ } else {
+ // (ihk, hk, sk) > (?, ?, ?)
+ String sk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+ return String.format(RVC_3, ihk, hk, sk, op);
+ }
+ } else {
+ String isk =
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(1), true);
+ if (tablePKCols.size() == 1) {
+ // (ihk, isk, hk) > (?, ?, ?)
+ return String.format(RVC_3, ihk, isk, hk, op);
+ } else {
+ // (ihk, isk, hk, sk) > (?, ?, ?, ?)
+ String sk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+ return String.format(RVC_4, ihk, isk, hk, sk, op);
}
}
}
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
index baa434d..d9c8656 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ScanConfig.java
@@ -31,18 +31,14 @@ public class ScanConfig {
* Enumeration of different scan types
*/
public enum ScanType {
- NO_EXCLUSIVE_START_KEY, // Simple scan without pagination
- SINGLE_KEY_CONTINUATION, // pk > value (single key table)
- TWO_KEY_FIRST_QUERY, // pk1 = value AND pk2 > value (two
key table, first query)
- TWO_KEY_SECOND_QUERY // pk1 > value (two key table, second
query)
+ NO_EXCLUSIVE_START_KEY, // first page
+ WITH_EXCLUSIVE_START_KEY // subsequent page — apply RVC
}
private final ScanType type;
private final boolean useIndex;
private final List<PColumn> tablePKCols;
private final List<PColumn> indexPKCols;
- private final PColumn partitionKeyCol;
- private final PColumn sortKeyCol;
private final int limit;
private final String tableName;
private final String indexName;
@@ -61,22 +57,6 @@ public class ScanConfig {
this.tableName = tableName;
this.indexName = indexName;
this.countOnly = countOnly;
-
- List<PColumn> relevantPKCols = useIndex ? indexPKCols : tablePKCols;
- this.partitionKeyCol = relevantPKCols.get(0);
- this.sortKeyCol = (relevantPKCols.size() > 1) ? relevantPKCols.get(1)
: null;
- }
-
- /**
- * Creates a new ScanConfig with a different type and limit, keeping other
properties
- */
- public ScanConfig cloneWithTypeAndLimit(ScanType newType, int newLimit) {
- ScanConfig newConfig = new ScanConfig(newType, this.useIndex,
this.tablePKCols,
- this.indexPKCols, newLimit, this.tableName, this.indexName,
this.countOnly);
- if (this.isSegmentScan) {
- newConfig.setScanSegmentInfo(this.scanSegmentInfo);
- }
- return newConfig;
}
public void setScanSegmentInfo(ScanSegmentInfo segmentInfo) {
@@ -104,14 +84,6 @@ public class ScanConfig {
return indexPKCols;
}
- public PColumn getPartitionKeyCol() {
- return partitionKeyCol;
- }
-
- public PColumn getSortKeyCol() {
- return sortKeyCol;
- }
-
public int getLimit() {
return limit;
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
index 712b819..ffc853e 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -56,8 +57,7 @@ import org.apache.phoenix.util.ServerUtil;
import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
/**
- * Parametrized test to verify getExclusiveStartKeyConditionForScan() logic in
DQLUtils.
- * Tests scan pagination with different limits and different combinations of
hash and sort key data types.
+ * Tests scan pagination on table with different limits and different
combinations of hash and sort key data types.
*/
@RunWith(Parameterized.class)
public class ScanExclusiveStartKeyIT {
@@ -79,6 +79,7 @@ public class ScanExclusiveStartKeyIT {
// Test parameters
public int scanLimit;
public KeyTypeConfig keyTypeConfig;
+ public boolean withFilter;
// Configuration for different key type combinations
public static class KeyTypeConfig {
@@ -102,7 +103,7 @@ public class ScanExclusiveStartKeyIT {
}
}
- @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}")
+ @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}_filter_{2}")
public static synchronized Collection<Object[]> data() {
List<Object[]> parameters = new ArrayList<>();
@@ -120,19 +121,21 @@ public class ScanExclusiveStartKeyIT {
new KeyTypeConfig("S_B", ScalarAttributeType.S,
ScalarAttributeType.B) // String + Binary
};
- // Create all combinations of limits and key types
for (int limit : scanLimits) {
for (KeyTypeConfig config : keyConfigs) {
- parameters.add(new Object[]{limit, config});
+ for (boolean filter : new boolean[]{false, true}) {
+ parameters.add(new Object[]{limit, config, filter});
+ }
}
}
return parameters;
}
- public ScanExclusiveStartKeyIT(int scanLimit, KeyTypeConfig keyTypeConfig)
{
+ public ScanExclusiveStartKeyIT(int scanLimit, KeyTypeConfig keyTypeConfig,
boolean withFilter) {
this.scanLimit = scanLimit;
this.keyTypeConfig = keyTypeConfig;
+ this.withFilter = withFilter;
}
@BeforeClass
@@ -146,6 +149,7 @@ public class ScanExclusiveStartKeyIT {
utility.startMiniCluster();
String zkQuorum = "localhost:" +
utility.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL +
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ DriverManager.registerDriver(new PhoenixTestDriver());
restServer = new RESTServer(utility.getConfiguration());
restServer.run();
@@ -182,22 +186,19 @@ public class ScanExclusiveStartKeyIT {
@Test(timeout = 120000)
public void testScanExclusiveStartKeyPagination() {
final String tableName =
testName.getMethodName().replaceAll("[\\[\\]]", "_") +
- "_limit_" + scanLimit + "_" + keyTypeConfig.name;
+ "_limit_" + scanLimit + "_" + keyTypeConfig.name + "_filter_"
+ withFilter;
- // Create table with hash key and sort key of specified types
+ final int totalItems = 56;
+
CreateTableRequest createTableRequest =
DDLTestUtils.getCreateTableRequest(tableName,
keyTypeConfig.hashKeyName,
keyTypeConfig.hashKeyType, keyTypeConfig.sortKeyName,
keyTypeConfig.sortKeyType);
phoenixDBClientV2.createTable(createTableRequest);
dynamoDbClient.createTable(createTableRequest);
- // Insert 56 items: 7 hash keys with 8 sort keys each
- List<Map<String, AttributeValue>> testItems = new ArrayList<>();
for (int hashIndex = 0; hashIndex < 7; hashIndex++) {
for (int sortIndex = 0; sortIndex < 8; sortIndex++) {
Map<String, AttributeValue> item = createTestItem(hashIndex,
sortIndex, keyTypeConfig);
- testItems.add(item);
-
PutItemRequest putItemRequest = PutItemRequest.builder()
.tableName(tableName)
.item(item)
@@ -207,7 +208,6 @@ public class ScanExclusiveStartKeyIT {
}
}
- // Perform paginated scan with the specified limit
List<Map<String, AttributeValue>> phoenixItems = new ArrayList<>();
List<Map<String, AttributeValue>> dynamoItems = new ArrayList<>();
@@ -217,60 +217,51 @@ public class ScanExclusiveStartKeyIT {
int phoenixPaginationCount = 0;
int dynamoPaginationCount = 0;
- // Phoenix scan with pagination
- do {
- ScanRequest.Builder phoenixScanRequest = ScanRequest.builder()
- .tableName(tableName)
- .limit(scanLimit);
-
- if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
- phoenixScanRequest.exclusiveStartKey(phoenixLastKey);
- }
-
- ScanResponse phoenixResult =
phoenixDBClientV2.scan(phoenixScanRequest.build());
- phoenixItems.addAll(phoenixResult.items());
- phoenixLastKey = phoenixResult.lastEvaluatedKey();
- phoenixPaginationCount++;
-
- LOGGER.info("Phoenix scan iteration {}, returned {} items, last
key: {}",
- phoenixPaginationCount, phoenixResult.count(),
phoenixLastKey);
-
- } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
-
- // DynamoDB scan with pagination
do {
ScanRequest.Builder dynamoScanRequest = ScanRequest.builder()
.tableName(tableName)
.limit(scanLimit);
-
+ applyFilter(dynamoScanRequest);
if (dynamoLastKey != null && !dynamoLastKey.isEmpty()) {
dynamoScanRequest.exclusiveStartKey(dynamoLastKey);
}
-
ScanResponse dynamoResult =
dynamoDbClient.scan(dynamoScanRequest.build());
dynamoItems.addAll(dynamoResult.items());
dynamoLastKey = dynamoResult.lastEvaluatedKey();
dynamoPaginationCount++;
-
LOGGER.info("DynamoDB scan iteration {}, returned {} items, last
key: {}",
dynamoPaginationCount, dynamoResult.count(),
dynamoLastKey);
-
} while (dynamoLastKey != null && !dynamoLastKey.isEmpty());
- // Verify total number of items retrieved
- Assert.assertEquals("Phoenix should return all 56 items", 56,
phoenixItems.size());
- Assert.assertEquals("DynamoDB should return all 56 items", 56,
dynamoItems.size());
-
- // Verify that both clients returned the same number of pagination
rounds
- // Note: The exact pagination behavior might differ slightly, but both
should complete
+ do {
+ ScanRequest.Builder phoenixScanRequest = ScanRequest.builder()
+ .tableName(tableName)
+ .limit(scanLimit);
+ applyFilter(phoenixScanRequest);
+ if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
+ phoenixScanRequest.exclusiveStartKey(phoenixLastKey);
+ }
+ ScanResponse phoenixResult =
phoenixDBClientV2.scan(phoenixScanRequest.build());
+ phoenixItems.addAll(phoenixResult.items());
+ phoenixLastKey = phoenixResult.lastEvaluatedKey();
+ phoenixPaginationCount++;
+ LOGGER.info("Phoenix scan iteration {}, returned {} items, last
key: {}",
+ phoenixPaginationCount, phoenixResult.count(),
phoenixLastKey);
+ } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
+
+ Assert.assertEquals("Phoenix and DynamoDB should return same number of
items",
+ dynamoItems.size(), phoenixItems.size());
+ Assert.assertFalse("Should return some items", phoenixItems.isEmpty());
+ if (withFilter) {
+ Assert.assertTrue("Filter should reduce the result set",
+ phoenixItems.size() < totalItems);
+ } else {
+ Assert.assertEquals("Should return all items without filter",
+ totalItems, phoenixItems.size());
+ }
+
Assert.assertTrue("Phoenix pagination should complete",
phoenixPaginationCount > 0);
Assert.assertTrue("DynamoDB pagination should complete",
dynamoPaginationCount > 0);
-
- // For limits smaller than 8, we should see multiple pagination rounds
- if (scanLimit < 8) {
- Assert.assertTrue("Should require multiple pagination rounds for
limit " + scanLimit,
- phoenixPaginationCount > 7); // At least 7 hash keys, so
should need multiple rounds
- }
List<Map<String, AttributeValue>> sortedPhoenixItems =
TestUtils.sortItemsByPartitionAndSortKey(phoenixItems,
keyTypeConfig.hashKeyName, keyTypeConfig.sortKeyName);
@@ -283,19 +274,29 @@ public class ScanExclusiveStartKeyIT {
/**
* Create a test item with the given partition key and sort key based on
the key type configuration.
*/
+ private void applyFilter(ScanRequest.Builder sr) {
+ if (withFilter) {
+ Map<String, String> names = new HashMap<>();
+ names.put("#nf", "num_field");
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":nv", AttributeValue.builder().n("30").build());
+ sr.filterExpression("#nf > :nv")
+ .expressionAttributeNames(names)
+ .expressionAttributeValues(values);
+ }
+ }
+
private Map<String, AttributeValue> createTestItem(int hashIndex, int
sortIndex, KeyTypeConfig config) {
Map<String, AttributeValue> item = new HashMap<>();
- // Create hash key value based on type
AttributeValue hashKeyValue = createAttributeValue(config.hashKeyType,
hashIndex, "pk");
item.put(config.hashKeyName, hashKeyValue);
- // Create sort key value based on type
AttributeValue sortKeyValue = createAttributeValue(config.sortKeyType,
sortIndex, "sk");
item.put(config.sortKeyName, sortKeyValue);
- // Add a data field for verification
item.put("data_field", AttributeValue.builder().s("data_" + hashIndex
+ "_" + sortIndex).build());
+ item.put("num_field",
AttributeValue.builder().n(String.valueOf(hashIndex * 10 + sortIndex)).build());
return item;
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java
index e22dfe3..131b8bb 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex2IT.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -83,6 +84,7 @@ public class ScanIndex2IT {
utility.startMiniCluster();
String zkQuorum = "localhost:" +
utility.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL +
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ DriverManager.registerDriver(new PhoenixTestDriver());
restServer = new RESTServer(utility.getConfiguration());
restServer.run();
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
index 0dd7a78..6a9be8d 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.ddb.rest.RESTServer;
import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
import org.junit.AfterClass;
@@ -79,6 +80,7 @@ public class ScanIndex3IT {
utility.startMiniCluster();
String zkQuorum = "localhost:" +
utility.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL +
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ DriverManager.registerDriver(new PhoenixTestDriver());
restServer = new RESTServer(utility.getConfiguration());
restServer.run();
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexExclusiveStartKeyIT.java
similarity index 52%
copy from
phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
copy to
phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexExclusiveStartKeyIT.java
index 712b819..26cf40d 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanExclusiveStartKeyIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexExclusiveStartKeyIT.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -56,13 +57,20 @@ import org.apache.phoenix.util.ServerUtil;
import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
/**
- * Parametrized test to verify getExclusiveStartKeyConditionForScan() logic in
DQLUtils.
- * Tests scan pagination with different limits and different combinations of
hash and sort key data types.
+ * Tests scan pagination on indexes with different limits and key type
combinations.
+ * Items are inserted with deliberate ties on index keys to verify RVC cursor
correctness
+ * when multiple rows share the same (ihk, isk) pair.
*/
@RunWith(Parameterized.class)
-public class ScanExclusiveStartKeyIT {
+public class ScanIndexExclusiveStartKeyIT {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ScanExclusiveStartKeyIT.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ScanIndexExclusiveStartKeyIT.class);
+
+ private static final String INDEX_NAME = "test_gsi";
+ private static final String TABLE_HK = "partition_key";
+ private static final String TABLE_SK = "sort_key";
+ private static final String INDEX_HK = "idx_hk";
+ private static final String INDEX_SK = "idx_sk";
private final DynamoDbClient dynamoDbClient =
LocalDynamoDbTestBase.localDynamoDb().createV2Client();
@@ -75,25 +83,26 @@ public class ScanExclusiveStartKeyIT {
@Rule
public final TestName testName = new TestName();
-
- // Test parameters
+
public int scanLimit;
public KeyTypeConfig keyTypeConfig;
+ public boolean withFilter;
- // Configuration for different key type combinations
public static class KeyTypeConfig {
public final String name;
- public final ScalarAttributeType hashKeyType;
- public final ScalarAttributeType sortKeyType;
- public final String hashKeyName;
- public final String sortKeyName;
+ public final ScalarAttributeType tableHKType;
+ public final ScalarAttributeType tableSKType;
+ public final ScalarAttributeType indexHKType;
+ public final ScalarAttributeType indexSKType;
- public KeyTypeConfig(String name, ScalarAttributeType hashKeyType,
ScalarAttributeType sortKeyType) {
+ public KeyTypeConfig(String name, ScalarAttributeType tableHKType,
+ ScalarAttributeType tableSKType, ScalarAttributeType
indexHKType,
+ ScalarAttributeType indexSKType) {
this.name = name;
- this.hashKeyType = hashKeyType;
- this.sortKeyType = sortKeyType;
- this.hashKeyName = "partition_key";
- this.sortKeyName = "sort_key";
+ this.tableHKType = tableHKType;
+ this.tableSKType = tableSKType;
+ this.indexHKType = indexHKType;
+ this.indexSKType = indexSKType;
}
@Override
@@ -102,37 +111,42 @@ public class ScanExclusiveStartKeyIT {
}
}
- @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}")
+ @Parameterized.Parameters(name = "limit_{0}_keyTypes_{1}_filter_{2}")
public static synchronized Collection<Object[]> data() {
List<Object[]> parameters = new ArrayList<>();
-
- // Different scan limits to test
- int[] scanLimits = {1, 2, 3, 4, 5, 6, 7, 8};
-
- // Different key type combinations
+
+ int[] scanLimits = {1, 2, 3, 5, 7, 10, 15};
+
KeyTypeConfig[] keyConfigs = {
- new KeyTypeConfig("S_N", ScalarAttributeType.S,
ScalarAttributeType.N), // String + Number
- new KeyTypeConfig("B_B", ScalarAttributeType.B,
ScalarAttributeType.B), // Binary + Binary
- new KeyTypeConfig("B_N", ScalarAttributeType.B,
ScalarAttributeType.N), // Binary + Number
- new KeyTypeConfig("N_S", ScalarAttributeType.N,
ScalarAttributeType.S), // Number + String
- new KeyTypeConfig("N_N", ScalarAttributeType.N,
ScalarAttributeType.N), // Number + Number
- new KeyTypeConfig("S_S", ScalarAttributeType.S,
ScalarAttributeType.S), // String + String
- new KeyTypeConfig("S_B", ScalarAttributeType.S,
ScalarAttributeType.B) // String + Binary
+ new KeyTypeConfig("tSN_iSN", ScalarAttributeType.S,
ScalarAttributeType.N,
+ ScalarAttributeType.S, ScalarAttributeType.N),
+ new KeyTypeConfig("tSS_iNS", ScalarAttributeType.S,
ScalarAttributeType.S,
+ ScalarAttributeType.N, ScalarAttributeType.S),
+ new KeyTypeConfig("tNN_iSS", ScalarAttributeType.N,
ScalarAttributeType.N,
+ ScalarAttributeType.S, ScalarAttributeType.S),
+ new KeyTypeConfig("tSN_iNN", ScalarAttributeType.S,
ScalarAttributeType.N,
+ ScalarAttributeType.N, ScalarAttributeType.N),
+ new KeyTypeConfig("tBN_iSN", ScalarAttributeType.B,
ScalarAttributeType.N,
+ ScalarAttributeType.S, ScalarAttributeType.N),
+ new KeyTypeConfig("tNS_iSB", ScalarAttributeType.N,
ScalarAttributeType.S,
+ ScalarAttributeType.S, ScalarAttributeType.B),
};
-
- // Create all combinations of limits and key types
+
for (int limit : scanLimits) {
for (KeyTypeConfig config : keyConfigs) {
- parameters.add(new Object[]{limit, config});
+ for (boolean filter : new boolean[]{false, true}) {
+ parameters.add(new Object[]{limit, config, filter});
+ }
}
}
-
+
return parameters;
}
- public ScanExclusiveStartKeyIT(int scanLimit, KeyTypeConfig keyTypeConfig)
{
+ public ScanIndexExclusiveStartKeyIT(int scanLimit, KeyTypeConfig
keyTypeConfig, boolean withFilter) {
this.scanLimit = scanLimit;
this.keyTypeConfig = keyTypeConfig;
+ this.withFilter = withFilter;
}
@BeforeClass
@@ -146,6 +160,7 @@ public class ScanExclusiveStartKeyIT {
utility.startMiniCluster();
String zkQuorum = "localhost:" +
utility.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL +
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ DriverManager.registerDriver(new PhoenixTestDriver());
restServer = new RESTServer(utility.getConfiguration());
restServer.run();
@@ -173,31 +188,34 @@ public class ScanExclusiveStartKeyIT {
}
/**
- * Test getExclusiveStartKeyConditionForScan() logic by creating a table
with hash and sort keys,
- * inserting 56 items (7 hash keys with 8 sort keys each), and scanning
with different limits.
- * This tests both cases:
- * 1. When last evaluated key has only hash key
- * 2. When last evaluated key has both hash key and sort key
+ * Test scan pagination on an index where multiple items share the same
(ihk, isk) pair.
+ * 56 items (7 table hash keys × 8 sort keys), with index keys cycled to
produce ties:
+ * ihk = hashIndex % 3 → 3 distinct values
+ * isk = sortIndex % 4 → 4 distinct values
+ * This forces the RVC cursor to rely on the full (ihk, isk, hk, sk) tuple
for correctness.
*/
@Test(timeout = 120000)
- public void testScanExclusiveStartKeyPagination() {
- final String tableName =
testName.getMethodName().replaceAll("[\\[\\]]", "_") +
- "_limit_" + scanLimit + "_" + keyTypeConfig.name;
-
- // Create table with hash key and sort key of specified types
+ public void testScanIndexExclusiveStartKeyPagination() {
+ final String tableName =
testName.getMethodName().replaceAll("[\\[\\]]", "_") +
+ "_limit_" + scanLimit + "_" + keyTypeConfig.name + "_filter_"
+ withFilter;
+
CreateTableRequest createTableRequest =
- DDLTestUtils.getCreateTableRequest(tableName,
keyTypeConfig.hashKeyName,
- keyTypeConfig.hashKeyType, keyTypeConfig.sortKeyName,
keyTypeConfig.sortKeyType);
+ DDLTestUtils.getCreateTableRequest(tableName, TABLE_HK,
+ keyTypeConfig.tableHKType, TABLE_SK,
keyTypeConfig.tableSKType);
+ createTableRequest = DDLTestUtils.addIndexToRequest(true,
createTableRequest,
+ INDEX_NAME, INDEX_HK, keyTypeConfig.indexHKType,
+ INDEX_SK, keyTypeConfig.indexSKType);
+
phoenixDBClientV2.createTable(createTableRequest);
dynamoDbClient.createTable(createTableRequest);
- // Insert 56 items: 7 hash keys with 8 sort keys each
- List<Map<String, AttributeValue>> testItems = new ArrayList<>();
- for (int hashIndex = 0; hashIndex < 7; hashIndex++) {
- for (int sortIndex = 0; sortIndex < 8; sortIndex++) {
- Map<String, AttributeValue> item = createTestItem(hashIndex,
sortIndex, keyTypeConfig);
- testItems.add(item);
-
+ final int numHashKeys = 7;
+ final int numSortKeys = 8;
+ final int totalItems = numHashKeys * numSortKeys;
+
+ for (int hashIndex = 0; hashIndex < numHashKeys; hashIndex++) {
+ for (int sortIndex = 0; sortIndex < numSortKeys; sortIndex++) {
+ Map<String, AttributeValue> item = createTestItem(hashIndex,
sortIndex);
PutItemRequest putItemRequest = PutItemRequest.builder()
.tableName(tableName)
.item(item)
@@ -207,115 +225,120 @@ public class ScanExclusiveStartKeyIT {
}
}
- // Perform paginated scan with the specified limit
List<Map<String, AttributeValue>> phoenixItems = new ArrayList<>();
List<Map<String, AttributeValue>> dynamoItems = new ArrayList<>();
-
+
Map<String, AttributeValue> phoenixLastKey = null;
Map<String, AttributeValue> dynamoLastKey = null;
-
- int phoenixPaginationCount = 0;
- int dynamoPaginationCount = 0;
- // Phoenix scan with pagination
+ int phoenixPageCount = 0;
+ int dynamoPageCount = 0;
+
do {
- ScanRequest.Builder phoenixScanRequest = ScanRequest.builder()
+ ScanRequest.Builder sr = ScanRequest.builder()
.tableName(tableName)
+ .indexName(INDEX_NAME)
.limit(scanLimit);
-
- if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
- phoenixScanRequest.exclusiveStartKey(phoenixLastKey);
+ applyFilter(sr);
+ if (dynamoLastKey != null && !dynamoLastKey.isEmpty()) {
+ sr.exclusiveStartKey(dynamoLastKey);
}
-
- ScanResponse phoenixResult =
phoenixDBClientV2.scan(phoenixScanRequest.build());
- phoenixItems.addAll(phoenixResult.items());
- phoenixLastKey = phoenixResult.lastEvaluatedKey();
- phoenixPaginationCount++;
-
- LOGGER.info("Phoenix scan iteration {}, returned {} items, last
key: {}",
- phoenixPaginationCount, phoenixResult.count(),
phoenixLastKey);
-
- } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
+ ScanResponse response = dynamoDbClient.scan(sr.build());
+ dynamoItems.addAll(response.items());
+ dynamoLastKey = response.lastEvaluatedKey();
+ dynamoPageCount++;
+ LOGGER.info("DynamoDB index scan page {}, returned {} items,
lastKey: {}",
+ dynamoPageCount, response.count(), dynamoLastKey);
+ } while (dynamoLastKey != null && !dynamoLastKey.isEmpty());
- // DynamoDB scan with pagination
do {
- ScanRequest.Builder dynamoScanRequest = ScanRequest.builder()
+ ScanRequest.Builder sr = ScanRequest.builder()
.tableName(tableName)
+ .indexName(INDEX_NAME)
.limit(scanLimit);
-
- if (dynamoLastKey != null && !dynamoLastKey.isEmpty()) {
- dynamoScanRequest.exclusiveStartKey(dynamoLastKey);
+ applyFilter(sr);
+ if (phoenixLastKey != null && !phoenixLastKey.isEmpty()) {
+ sr.exclusiveStartKey(phoenixLastKey);
}
-
- ScanResponse dynamoResult =
dynamoDbClient.scan(dynamoScanRequest.build());
- dynamoItems.addAll(dynamoResult.items());
- dynamoLastKey = dynamoResult.lastEvaluatedKey();
- dynamoPaginationCount++;
-
- LOGGER.info("DynamoDB scan iteration {}, returned {} items, last
key: {}",
- dynamoPaginationCount, dynamoResult.count(),
dynamoLastKey);
-
- } while (dynamoLastKey != null && !dynamoLastKey.isEmpty());
+ ScanResponse response = phoenixDBClientV2.scan(sr.build());
+ phoenixItems.addAll(response.items());
+ phoenixLastKey = response.lastEvaluatedKey();
+ phoenixPageCount++;
+ LOGGER.info("Phoenix index scan page {}, returned {} items,
lastKey: {}",
+ phoenixPageCount, response.count(), phoenixLastKey);
+ } while (phoenixLastKey != null && !phoenixLastKey.isEmpty());
+
+ Assert.assertEquals("Phoenix and DynamoDB should return same number of
items",
+ dynamoItems.size(), phoenixItems.size());
+ Assert.assertFalse("Should return some items", phoenixItems.isEmpty());
+ if (withFilter) {
+ Assert.assertTrue("Filter should reduce the result set",
+ phoenixItems.size() < totalItems);
+ } else {
+ Assert.assertEquals("Should return all items without filter",
+ totalItems, phoenixItems.size());
+ }
+
+ Assert.assertTrue("Phoenix pagination should complete",
phoenixPageCount > 0);
+ Assert.assertTrue("DynamoDB pagination should complete",
dynamoPageCount > 0);
- // Verify total number of items retrieved
- Assert.assertEquals("Phoenix should return all 56 items", 56,
phoenixItems.size());
- Assert.assertEquals("DynamoDB should return all 56 items", 56,
dynamoItems.size());
-
- // Verify that both clients returned the same number of pagination
rounds
- // Note: The exact pagination behavior might differ slightly, but both
should complete
- Assert.assertTrue("Phoenix pagination should complete",
phoenixPaginationCount > 0);
- Assert.assertTrue("DynamoDB pagination should complete",
dynamoPaginationCount > 0);
-
- // For limits smaller than 8, we should see multiple pagination rounds
- if (scanLimit < 8) {
- Assert.assertTrue("Should require multiple pagination rounds for
limit " + scanLimit,
- phoenixPaginationCount > 7); // At least 7 hash keys, so
should need multiple rounds
+ if (scanLimit < totalItems) {
+ Assert.assertTrue("Should require multiple pagination rounds for
limit " + scanLimit,
+ phoenixPageCount > 1);
}
List<Map<String, AttributeValue>> sortedPhoenixItems =
- TestUtils.sortItemsByPartitionAndSortKey(phoenixItems,
keyTypeConfig.hashKeyName, keyTypeConfig.sortKeyName);
+ TestUtils.sortItemsByPartitionAndSortKey(phoenixItems,
TABLE_HK, TABLE_SK);
List<Map<String, AttributeValue>> sortedDynamoItems =
- TestUtils.sortItemsByPartitionAndSortKey(dynamoItems,
keyTypeConfig.hashKeyName, keyTypeConfig.sortKeyName);
+ TestUtils.sortItemsByPartitionAndSortKey(dynamoItems,
TABLE_HK, TABLE_SK);
Assert.assertTrue("Phoenix and DynamoDB should return identical items
when sorted",
ItemComparator.areItemsEqual(sortedPhoenixItems,
sortedDynamoItems));
}
- /**
- * Create a test item with the given partition key and sort key based on
the key type configuration.
- */
- private Map<String, AttributeValue> createTestItem(int hashIndex, int
sortIndex, KeyTypeConfig config) {
+ private void applyFilter(ScanRequest.Builder sr) {
+ if (withFilter) {
+ Map<String, String> names = new HashMap<>();
+ names.put("#sc", "score");
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":sv", AttributeValue.builder().n("300").build());
+ sr.filterExpression("#sc > :sv")
+ .expressionAttributeNames(names)
+ .expressionAttributeValues(values);
+ }
+ }
+
+ private Map<String, AttributeValue> createTestItem(int hashIndex, int
sortIndex) {
Map<String, AttributeValue> item = new HashMap<>();
-
- // Create hash key value based on type
- AttributeValue hashKeyValue = createAttributeValue(config.hashKeyType,
hashIndex, "pk");
- item.put(config.hashKeyName, hashKeyValue);
-
- // Create sort key value based on type
- AttributeValue sortKeyValue = createAttributeValue(config.sortKeyType,
sortIndex, "sk");
- item.put(config.sortKeyName, sortKeyValue);
-
- // Add a data field for verification
- item.put("data_field", AttributeValue.builder().s("data_" + hashIndex
+ "_" + sortIndex).build());
-
+
+ item.put(TABLE_HK, createAttributeValue(keyTypeConfig.tableHKType,
hashIndex, "pk"));
+ item.put(TABLE_SK, createAttributeValue(keyTypeConfig.tableSKType,
sortIndex, "sk"));
+
+ // Cycle index keys to create ties: 3 distinct ihk values, 4 distinct
isk values
+ int ihkVal = hashIndex % 3;
+ int iskVal = sortIndex % 4;
+ item.put(INDEX_HK, createAttributeValue(keyTypeConfig.indexHKType,
ihkVal, "ihk"));
+ item.put(INDEX_SK, createAttributeValue(keyTypeConfig.indexSKType,
iskVal, "isk"));
+
+ item.put("extra_data", AttributeValue.builder()
+ .s("data_" + hashIndex + "_" + sortIndex).build());
+ item.put("score", AttributeValue.builder()
+ .n(String.valueOf(hashIndex * 100 + sortIndex)).build());
+
return item;
}
- /**
- * Create an AttributeValue based on the specified type and index.
- */
- private AttributeValue createAttributeValue(ScalarAttributeType type, int
index, String prefix) {
+ private AttributeValue createAttributeValue(ScalarAttributeType type, int
index,
+ String prefix) {
switch (type) {
case S:
return AttributeValue.builder().s(prefix + index).build();
case N:
return
AttributeValue.builder().n(String.valueOf(index)).build();
case B:
- // Create binary data using the index as bytes
byte[] bytes = (prefix + index).getBytes();
return
AttributeValue.builder().b(SdkBytes.fromByteArray(bytes)).build();
default:
throw new IllegalArgumentException("Unsupported attribute
type: " + type);
}
}
-
-}
\ No newline at end of file
+}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
index d3833ca..be9edbc 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -78,6 +79,7 @@ public class ScanIndexIT {
utility.startMiniCluster();
String zkQuorum = "localhost:" +
utility.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL +
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ DriverManager.registerDriver(new PhoenixTestDriver());
restServer = new RESTServer(utility.getConfiguration());
restServer.run();
@@ -478,6 +480,102 @@ public class ScanIndexIT {
TestUtils.verifyItemsEqual(ddbItems, phoenixResult0.items(), "title",
null);
}
+ /**
+ * RVC_1: Paginated scan on a table with hash key only (no sort key, no
index).
+ */
+ @Test(timeout = 120000)
+ public void testScanPaginationTableHKOnly() {
+ final String tableName = testName.getMethodName();
+ CreateTableRequest createTableRequest =
+ DDLTestUtils.getCreateTableRequest(tableName, "pk",
+ ScalarAttributeType.S, null, null);
+ phoenixDBClientV2.createTable(createTableRequest);
+ dynamoDbClient.createTable(createTableRequest);
+
+ for (int i = 0; i < 6; i++) {
+ Map<String, AttributeValue> item = new HashMap<>();
+ item.put("pk", AttributeValue.builder().s("pk" + i).build());
+ item.put("data", AttributeValue.builder().s("val" + i).build());
+ PutItemRequest req =
PutItemRequest.builder().tableName(tableName).item(item).build();
+ phoenixDBClientV2.putItem(req);
+ dynamoDbClient.putItem(req);
+ }
+
+ ScanRequest.Builder sr =
ScanRequest.builder().tableName(tableName).limit(2);
+ TestUtils.compareScanOutputs(sr, phoenixDBClientV2, dynamoDbClient,
+ "pk", null, ScalarAttributeType.S, null);
+ }
+
+ /**
+ * RVC_2: Paginated scan on an ihk-only index over a hk-only table.
+ */
+ @Test(timeout = 120000)
+ public void testScanPaginationIHKOnlyIndex_TableHKOnly() throws
SQLException {
+ final String tableName = testName.getMethodName();
+ final String indexName = "idx_" + tableName;
+ CreateTableRequest createTableRequest =
+ DDLTestUtils.getCreateTableRequest(tableName, "pk",
+ ScalarAttributeType.S, null, null);
+ createTableRequest = DDLTestUtils.addIndexToRequest(true,
createTableRequest,
+ indexName, "status", ScalarAttributeType.S, null, null);
+ phoenixDBClientV2.createTable(createTableRequest);
+ dynamoDbClient.createTable(createTableRequest);
+
+ String[] statuses = {"active", "inactive", "active", "pending",
"inactive", "active"};
+ for (int i = 0; i < 6; i++) {
+ Map<String, AttributeValue> item = new HashMap<>();
+ item.put("pk", AttributeValue.builder().s("pk" + i).build());
+ item.put("status",
AttributeValue.builder().s(statuses[i]).build());
+ item.put("data", AttributeValue.builder().s("val" + i).build());
+ PutItemRequest req =
PutItemRequest.builder().tableName(tableName).item(item).build();
+ phoenixDBClientV2.putItem(req);
+ dynamoDbClient.putItem(req);
+ }
+
+ ScanRequest.Builder sr = ScanRequest.builder()
+ .tableName(tableName).indexName(indexName).limit(2);
+ TestUtils.compareScanOutputs(sr, phoenixDBClientV2, dynamoDbClient,
+ "pk", null, ScalarAttributeType.S, null);
+ sr.exclusiveStartKey(null);
+ TestUtils.validateIndexUsed(sr.build(), url, "FULL SCAN ");
+ }
+
+ /**
+ * RVC_3: Paginated scan on an ihk+isk index over a hk-only table.
+ */
+ @Test(timeout = 120000)
+ public void testScanPaginationIHKISKIndex_TableHKOnly() throws
SQLException {
+ final String tableName = testName.getMethodName();
+ final String indexName = "idx_" + tableName;
+ CreateTableRequest createTableRequest =
+ DDLTestUtils.getCreateTableRequest(tableName, "pk",
+ ScalarAttributeType.S, null, null);
+ createTableRequest = DDLTestUtils.addIndexToRequest(true,
createTableRequest,
+ indexName, "status", ScalarAttributeType.S,
+ "timestamp", ScalarAttributeType.N);
+ phoenixDBClientV2.createTable(createTableRequest);
+ dynamoDbClient.createTable(createTableRequest);
+
+ String[] statuses = {"active", "inactive", "active", "pending",
"inactive", "active"};
+ for (int i = 0; i < 6; i++) {
+ Map<String, AttributeValue> item = new HashMap<>();
+ item.put("pk", AttributeValue.builder().s("pk" + i).build());
+ item.put("status",
AttributeValue.builder().s(statuses[i]).build());
+ item.put("timestamp",
AttributeValue.builder().n(String.valueOf(1000 + (i % 3))).build());
+ item.put("data", AttributeValue.builder().s("val" + i).build());
+ PutItemRequest req =
PutItemRequest.builder().tableName(tableName).item(item).build();
+ phoenixDBClientV2.putItem(req);
+ dynamoDbClient.putItem(req);
+ }
+
+ ScanRequest.Builder sr = ScanRequest.builder()
+ .tableName(tableName).indexName(indexName).limit(2);
+ TestUtils.compareScanOutputs(sr, phoenixDBClientV2, dynamoDbClient,
+ "pk", null, ScalarAttributeType.S, null);
+ sr.exclusiveStartKey(null);
+ TestUtils.validateIndexUsed(sr.build(), url, "FULL SCAN ");
+ }
+
private static Map<String, AttributeValue> getItem1() {
Map<String, AttributeValue> item = new HashMap<>();
item.put("DataGrave-_-Obscure_Dream_.404",
AttributeValue.builder().s("A").build());
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
index b884759..432556b 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
@@ -178,17 +178,10 @@ public class TestUtils {
int effectiveLimit = 100;
boolean countOnly =
ApiMetadata.SELECT_COUNT.equals(request.get(ApiMetadata.SELECT));
- ScanConfig config = new ScanConfig(
- ScanService.determineScanType(exclusiveStartKey, useIndex,
tablePKCols, indexPKCols),
+ ScanConfig config = new
ScanConfig(ScanService.determineScanType(exclusiveStartKey),
useIndex, tablePKCols, indexPKCols, effectiveLimit, tableName,
indexName, countOnly
);
-
- // For two-query scenarios, return the first query's PreparedStatement
- if (config.getType() == ScanConfig.ScanType.TWO_KEY_FIRST_QUERY) {
- return ScanService.buildQuery(connection, request, config);
- } else {
- return ScanService.buildQuery(connection, request, config);
- }
+ return ScanService.buildQuery(connection, request, config);
}
/**