This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 9b479c9 Using Phoenix RVC for Index Query Pagination
9b479c9 is described below
commit 9b479c99644f6ffee66f559d7332fa9caa45850a
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Feb 19 20:54:44 2026 -0800
Using Phoenix RVC for Index Query Pagination
---
.../apache/phoenix/ddb/service/QueryService.java | 46 ++++-
.../apache/phoenix/ddb/service/utils/DQLUtils.java | 58 ++++--
.../org/apache/phoenix/ddb/BinaryEndToEndIT.java | 59 ++++++
.../phoenix/ddb/QueryIndexExclusiveStartKeyIT.java | 228 +++++++++++++++++++++
4 files changed, 369 insertions(+), 22 deletions(-)
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
index 13bc8e2..9325737 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
@@ -127,7 +127,6 @@ public class QueryService {
} catch (RuntimeException e) {
throw new ValidationException(e.getMessage());
}
- PColumn sortKeyPKCol = keyConditions.getSortKeyPKCol();
// append all conditions for WHERE clause
// TODO: Validate exclusiveStartKey against sortKey range in key
condition expression
@@ -135,7 +134,7 @@ public class QueryService {
boolean scanIndexForward = doScanIndexForward(request);
DQLUtils.addExclusiveStartKeyConditionForQuery(queryBuilder,
(Map<String, Object>)
request.get(ApiMetadata.EXCLUSIVE_START_KEY), useIndex,
- sortKeyPKCol, scanIndexForward);
+ scanIndexForward, tablePKCols, indexPKCols);
DQLUtils.addFilterCondition(true, queryBuilder, (String)
request.get(ApiMetadata.FILTER_EXPRESSION),
exprAttrNames, exprAttrValues);
addOrderByClause(queryBuilder, useIndex, tablePKCols, indexPKCols,
scanIndexForward);
@@ -144,8 +143,8 @@ public class QueryService {
// Set values on the PreparedStatement
PreparedStatement stmt =
conn.prepareStatement(queryBuilder.toString());
- setPreparedStatementValues(stmt, request, keyConditions, useIndex,
sortKeyPKCol);
- return Pair.newPair(stmt, sortKeyPKCol == null);
+ setPreparedStatementValues(stmt, request, keyConditions, useIndex,
tablePKCols, indexPKCols);
+ return Pair.newPair(stmt, !useIndex && tablePKCols.size() == 1);
}
private static void addOrderByClause(StringBuilder queryBuilder, boolean
useIndex,
@@ -162,6 +161,13 @@ public class QueryService {
queryBuilder.append(", ").append(sortKeyName)
.append(scanIndexForward ? " ASC " : " DESC ");
}
+ if (useIndex) {
+ for (PColumn tablePkCol : tablePKCols) {
+ String pkName =
CommonServiceUtils.getEscapedArgument(tablePkCol.getName().toString());
+ queryBuilder.append(",
").append(pkName).append(scanIndexForward ? " ASC " : " DESC ");
+ }
+ }
+ queryBuilder.append(" ");
}
/**
@@ -172,15 +178,19 @@ public class QueryService {
*/
private static void setPreparedStatementValues(PreparedStatement stmt,
Map<String, Object> request, KeyConditionsHolder keyConditions,
boolean useIndex,
- PColumn sortKeyPKCol) throws SQLException {
+ List<PColumn> tablePKCols, List<PColumn> indexPKCols) throws
SQLException {
int index = 1;
Map<String, Object> exclusiveStartKey =
(Map<String, Object>)
request.get(ApiMetadata.EXCLUSIVE_START_KEY);
Map<String, Object> exprAttrVals =
(Map<String, Object>)
request.get(ApiMetadata.EXPRESSION_ATTRIBUTE_VALUES);
+
+ // Bind partition key value
Map<String, Object> partitionAttrVal =
(Map<String, Object>)
exprAttrVals.get(keyConditions.getPartitionValue());
DQLUtils.setKeyValueOnStatement(stmt, index++, partitionAttrVal,
false);
+
+ // Bind sort key values from key condition expression
if (keyConditions.hasSortKey()) {
if (keyConditions.hasBeginsWith()) {
Map<String, Object> sortAttrVal = (Map<String, Object>)
exprAttrVals.get(
@@ -198,10 +208,28 @@ public class QueryService {
}
}
}
- if (exclusiveStartKey != null && !exclusiveStartKey.isEmpty() &&
sortKeyPKCol != null) {
- String sortKeyName =
CommonServiceUtils.getColumnNameFromPCol(sortKeyPKCol, useIndex);
- DQLUtils.setKeyValueOnStatement(stmt, index,
- (Map<String, Object>) exclusiveStartKey.get(sortKeyName),
false);
+ // Bind exclusive start key values for pagination
+ if (exclusiveStartKey != null && !exclusiveStartKey.isEmpty()) {
+ if (useIndex) {
+ // Index query: bind values matching RVC order (isk if
present, then table PKs)
+ if (indexPKCols.size() > 1) {
+ String iskName =
CommonServiceUtils.getKeyNameFromBsonValueFunc(indexPKCols.get(1).getName().toString());
+ DQLUtils.setKeyValueOnStatement(stmt, index++,
+ (Map<String, Object>)
exclusiveStartKey.get(iskName), false);
+ }
+ for (PColumn tablePkCol : tablePKCols) {
+ String pkName = tablePkCol.getName().getString();
+ DQLUtils.setKeyValueOnStatement(stmt, index++,
+ (Map<String, Object>)
exclusiveStartKey.get(pkName), false);
+ }
+ } else {
+ // Table query: only bind sort key if table has one
+ if (tablePKCols.size() > 1) {
+ String skName = tablePKCols.get(1).getName().getString();
+ DQLUtils.setKeyValueOnStatement(stmt, index++,
+ (Map<String, Object>)
exclusiveStartKey.get(skName), false);
+ }
+ }
}
}
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
index 447eba1..0c7987f 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
@@ -29,6 +29,10 @@ import java.util.Properties;
public class DQLUtils {
public static final String SIZE_LIMIT_REACHED = "SizeLimitReached";
+ private static final String ISK_HK_RVC = "(%s, %s) %s (?, ?)";
+ private static final String ISK_HK_SK_RVC = "(%s, %s, %s) %s (?, ?, ?)";
+ private static final String HK_SK_RVC = "(%s, %s) %s (?, ?)";
+ private static final String HK_RVC = "(%s) %s (?)";
/**
* Execute the given PreparedStatement, collect all returned items with
projected attributes
@@ -134,28 +138,29 @@ public class DQLUtils {
}
/**
- * If table has a sortKey and the QueryRequest provides an
ExclusiveStartKey,
- * add the condition for the sortKey to the query. If the request provides
an index,
- * replace sortKey name with a BSON_VALUE expression.
- * Return the sortKeyName here in case the QueryRequest's
KeyConditionExpression
- * did not have a condition on the sortKey.
+ * If table has a sortKey in its KeyConditionExpression and the
QueryRequest provides an ExclusiveStartKey,
+ * add the condition for the sortKey to the query.
+ * If the request provides an index, replace sortKey name with a
BSON_VALUE expression.
+ * For index, we also need to provide RVC condition for data table PKs.
*/
public static void addExclusiveStartKeyConditionForQuery(StringBuilder
queryBuilder,
Map<String, Object> exclusiveStartKey, boolean useIndex,
- PColumn sortKeyPKCol, boolean scanIndexForward) {
+ boolean scanIndexForward, List<PColumn> tablePKCols, List<PColumn>
indexPKCols) {
if (exclusiveStartKey != null && !exclusiveStartKey.isEmpty()) {
String op = " > ";
// when scanning backwards, flip the operator
if (!scanIndexForward) {
op = " < ";
}
- if (sortKeyPKCol != null) {
- //append sortKey condition if there is a sortKey
- String name = sortKeyPKCol.getName().toString();
- name = (useIndex) ?
- name.substring(1) :
- CommonServiceUtils.getEscapedArgument(name);
- queryBuilder.append(" AND " + name + op + " ? ");
+ if (useIndex) {
+ String rvcClause = getRVCClauseForIndexQuery(op,
tablePKCols, indexPKCols);
+ queryBuilder.append(" AND ").append(rvcClause);
+ } else {
+ if (tablePKCols.size() == 2) {
+ String name = tablePKCols.get(1).getName().toString();
+ name = CommonServiceUtils.getEscapedArgument(name);
+ queryBuilder.append(" AND " + name + op + " ? ");
+ }
}
}
}
@@ -207,4 +212,31 @@ public class DQLUtils {
}
}
+
+ private static String getRVCClauseForIndexQuery(String op, List<PColumn>
tablePKCols,
+ List<PColumn> indexPKCols)
{
+ String hk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(0).getName().toString());
+ if (indexPKCols.size() == 1) {
+ // Index has only ihk
+ if (tablePKCols.size() == 1) {
+ // (hk) > (?)
+ return String.format(HK_RVC, hk, op);
+ } else {
+ // (hk, sk) > (?, ?)
+ String sk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+ return String.format(HK_SK_RVC, hk, sk, op);
+ }
+ } else {
+ // Index has ihk, isk
+ String isk =
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(1), true);
+ if (tablePKCols.size() == 1) {
+ // (isk, hk) > (?, ?)
+ return String.format(ISK_HK_RVC, isk, hk, op);
+ } else {
+ // (isk, hk, sk) > (?, ?, ?)
+ String sk =
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+ return String.format(ISK_HK_SK_RVC, isk, hk, sk, op);
+ }
+ }
+ }
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
index 4d36535..ebcb96d 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
@@ -766,6 +766,65 @@ public class BinaryEndToEndIT {
TestUtils.compareQueryOutputs(qr, phoenixDBClientV2, dynamoDbClient);
}
+ @Test
+ public void queryIndexWithEqualsSortKeyAndPagination() {
+ // Common GSI key values (like Phoenix test: GSI_HK and GSI_SK are the
same for all rows)
+ byte[] gsiHk = new byte[]{0x00, 0x00, 0x00, 0x0A}; // GSI hash key
+ byte[] gsiSk = "1".getBytes(); // GSI sort key =
"1"
+
+ // Row 1: HK=[0x0B,0x01], SK="1" - ExclusiveStartKey row, should be
EXCLUDED
+ byte[] hk1 = new byte[]{0x0B, 0x01};
+ byte[] sk1 = "1".getBytes();
+ Map<String, AttributeValue> item1 = new HashMap<>();
+ item1.put("hk",
AttributeValue.builder().b(SdkBytes.fromByteArray(hk1)).build());
+ item1.put("sk",
AttributeValue.builder().b(SdkBytes.fromByteArray(sk1)).build());
+ item1.put("index_hk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+ item1.put("index_sk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+ item1.put("data", AttributeValue.builder().s("row1").build());
+
+ // Row 2: HK=[0x0B,0x01], SK="2" - should be INCLUDED (SK "2" > "1")
+ byte[] hk2 = new byte[]{0x0B, 0x01};
+ byte[] sk2 = "2".getBytes();
+ Map<String, AttributeValue> item2 = new HashMap<>();
+ item2.put("hk",
AttributeValue.builder().b(SdkBytes.fromByteArray(hk2)).build());
+ item2.put("sk",
AttributeValue.builder().b(SdkBytes.fromByteArray(sk2)).build());
+ item2.put("index_hk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+ item2.put("index_sk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+ item2.put("data", AttributeValue.builder().s("row2").build());
+
+ // Row 3: HK=[0x0B,0x02], SK="1" - should be INCLUDED (HK [0x0B,0x02]
> [0x0B,0x01])
+ byte[] hk3 = new byte[]{0x0B, 0x02};
+ byte[] sk3 = "1".getBytes();
+ Map<String, AttributeValue> item3 = new HashMap<>();
+ item3.put("hk",
AttributeValue.builder().b(SdkBytes.fromByteArray(hk3)).build());
+ item3.put("sk",
AttributeValue.builder().b(SdkBytes.fromByteArray(sk3)).build());
+ item3.put("index_hk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+ item3.put("index_sk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+ item3.put("data", AttributeValue.builder().s("row3").build());
+
+ // Insert all rows
+
dynamoDbClient.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item1).build());
+
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item1).build());
+
dynamoDbClient.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item2).build());
+
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item2).build());
+
dynamoDbClient.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item3).build());
+
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item3).build());
+
+ // Query GSI with index_hk = gsiHk AND index_sk = gsiSk (EQUALS on
sort key)
+ Map<String, AttributeValue> exprVals = new HashMap<>();
+ exprVals.put(":indexHk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+ exprVals.put(":indexSk",
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+
+ QueryRequest.Builder qr = QueryRequest.builder()
+ .tableName(TABLE_NAME)
+ .indexName(INDEX_NAME)
+ .keyConditionExpression("index_hk = :indexHk AND index_sk =
:indexSk")
+ .limit(1)
+ .expressionAttributeValues(exprVals);
+
+ TestUtils.compareQueryOutputs(qr, phoenixDBClientV2, dynamoDbClient);
+ }
+
@Test
public void scanTableWithFilterProjectionAndPagination() {
List<Map<String, AttributeValue>> items = new ArrayList<>();
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/QueryIndexExclusiveStartKeyIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/QueryIndexExclusiveStartKeyIT.java
new file mode 100644
index 0000000..650dc81
--- /dev/null
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/QueryIndexExclusiveStartKeyIT.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.ddb.rest.RESTServer;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.ServerUtil;
+
+import static org.apache.phoenix.ddb.TestUtils.verifyItemsEqual;
+import static org.junit.Assert.assertTrue;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+
+@RunWith(Parameterized.class)
+public class QueryIndexExclusiveStartKeyIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryIndexExclusiveStartKeyIT.class);
+ private final DynamoDbClient dynamoDbClient =
+ LocalDynamoDbTestBase.localDynamoDb().createV2Client();
+ private static DynamoDbClient phoenixDBClientV2;
+ private static HBaseTestingUtility utility;
+ private static RESTServer restServer;
+ private static String tmpDir;
+
+ private final boolean tableHasSK;
+ private final boolean indexHasSK;
+ private final int limit;
+
+ @Parameterized.Parameters(name = "tableSK={0}_indexSK={1}_limit={2}")
+ public static Collection<Object[]> data() {
+ List<Object[]> params = new ArrayList<>();
+ boolean[] bools = {false, true};
+ int[] limits = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 15, 19, 23, 37,
43, 49};
+ for (boolean tableSK : bools) {
+ for (boolean indexSK : bools) {
+ for (int limit : limits) {
+ params.add(new Object[]{tableSK, indexSK, limit});
+ }
+ }
+ }
+ return params;
+ }
+
+ public QueryIndexExclusiveStartKeyIT(boolean tableHasSK, boolean
indexHasSK, int limit) {
+ this.tableHasSK = tableHasSK;
+ this.indexHasSK = indexHasSK;
+ this.limit = limit;
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ tmpDir = System.getProperty("java.io.tmpdir");
+ LocalDynamoDbTestBase.localDynamoDb().start();
+ Configuration conf = HBaseConfiguration.create();
+ utility = new HBaseTestingUtility(conf);
+ setUpConfigForMiniCluster(conf);
+ utility.startMiniCluster();
+ restServer = new RESTServer(utility.getConfiguration());
+ restServer.run();
+ phoenixDBClientV2 = LocalDynamoDB.createV2Client("http://" +
restServer.getServerAddress());
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ LocalDynamoDbTestBase.localDynamoDb().stop();
+ if (restServer != null) restServer.stop();
+ ServerUtil.ConnectionFactory.shutdown();
+ try {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ } finally {
+ if (utility != null) utility.shutdownMiniCluster();
+ ServerMetadataCacheTestImpl.resetCache();
+ }
+ System.setProperty("java.io.tmpdir", tmpDir);
+ }
+
+ @Test(timeout = 18000)
+ public void testIndexQueryPagination() {
+ String tableName = generateUniqueName() + "_" + tableHasSK + "_" +
indexHasSK + "_" + limit;
+ String indexName = "GSI_" + generateUniqueName();
+
+ createTableAndIndex(tableName, indexName);
+ insertItems(tableName, 200);
+
+ // Forward scan
+ List<Map<String, AttributeValue>> phoenixItems =
queryAllPages(phoenixDBClientV2, tableName, indexName, true);
+ List<Map<String, AttributeValue>> dynamoItems =
queryAllPages(dynamoDbClient, tableName, indexName, true);
+ verifyItemsEqual(dynamoItems, phoenixItems, "hk", tableHasSK ? "sk" :
null);
+
+
+ // Reverse scan
+ List<Map<String, AttributeValue>> phoenixItemsRev =
queryAllPages(phoenixDBClientV2, tableName, indexName, false);
+ for (int i = 1; i < phoenixItemsRev.size(); i++) {
+ Map<String, AttributeValue> prev = phoenixItemsRev.get(i - 1);
+ Map<String, AttributeValue> curr = phoenixItemsRev.get(i);
+ int cmp = 0;
+ if (indexHasSK && cmp == 0) cmp = Double.compare(
+ Double.parseDouble(curr.get("isk").n()),
+ Double.parseDouble(prev.get("isk").n()));
+ if (cmp == 0) cmp =
curr.get("hk").s().compareTo(prev.get("hk").s());
+ if (tableHasSK && cmp == 0) cmp =
curr.get("sk").s().compareTo(prev.get("sk").s());
+ assertTrue("Reverse ordering violated at index " + i + ": prev=" +
prev + ", curr=" + curr, cmp <= 0);
+ }
+ List<Map<String, AttributeValue>> dynamoItemsRev =
queryAllPages(dynamoDbClient, tableName, indexName, false);
+ verifyItemsEqual(dynamoItemsRev, phoenixItemsRev, "hk", tableHasSK ?
"sk" : null);
+ }
+
+ private void createTableAndIndex(String tableName, String indexName) {
+ CreateTableRequest req = DDLTestUtils.getCreateTableRequest(
+ tableName, "hk", ScalarAttributeType.S,
+ tableHasSK ? "sk" : null, tableHasSK ? ScalarAttributeType.S :
null);
+
+ req = DDLTestUtils.addIndexToRequest(true, req, indexName,
+ "ihk", ScalarAttributeType.S,
+ indexHasSK ? "isk" : null, indexHasSK ? ScalarAttributeType.N
: null);
+
+ phoenixDBClientV2.createTable(req);
+ dynamoDbClient.createTable(req);
+ }
+
+ private void insertItems(String tableName, int count) {
+ for (int i = 1; i <= count; i++) {
+ Map<String, AttributeValue> item = new HashMap<>();
+ // Create duplicates: groups of 5 share same hk when table has sk
+ String hkVal = tableHasSK ? "hk" + (i / 5) : "hk" + i;
+ item.put("hk", AttributeValue.builder().s(hkVal).build());
+ if (tableHasSK) {
+ item.put("sk", AttributeValue.builder().s("sk" +
String.format("%03d", i)).build());
+ }
+ // Create duplicates: 2 distinct ihk values
+ item.put("ihk", AttributeValue.builder().s("ihk" + (i %
2)).build());
+ if (indexHasSK) {
+ // Create duplicates: groups of 3 share same isk
+ item.put("isk", AttributeValue.builder().n(String.valueOf((i /
3) * 10)).build());
+ }
+ item.put("data", AttributeValue.builder().s("data" + i).build());
+
+ PutItemRequest put =
PutItemRequest.builder().tableName(tableName).item(item).build();
+ phoenixDBClientV2.putItem(put);
+ dynamoDbClient.putItem(put);
+ }
+ }
+
+ private List<Map<String, AttributeValue>> queryAllPages(DynamoDbClient
client,
+ String tableName,
String indexName,
+ boolean
scanIndexForward) {
+ Map<String, String> names = new HashMap<>();
+ names.put("#ihk", "ihk");
+
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":ihk", AttributeValue.builder().s("ihk1").build());
+
+ String keyCondExpr = "#ihk = :ihk";
+ if (indexHasSK) {
+ names.put("#isk", "isk");
+ values.put(":isk", AttributeValue.builder().n("500").build());
+ keyCondExpr = "#ihk = :ihk AND #isk <= :isk";
+ }
+
+ List<Map<String, AttributeValue>> allItems = new ArrayList<>();
+ Map<String, AttributeValue> lastKey = null;
+
+ int pageCount = 0;
+ do {
+ QueryRequest.Builder qb = QueryRequest.builder()
+ .tableName(tableName)
+ .indexName(indexName)
+ .keyConditionExpression(keyCondExpr)
+ .expressionAttributeNames(names)
+ .expressionAttributeValues(values)
+ .scanIndexForward(scanIndexForward)
+ .limit(limit);
+
+ if (lastKey != null && !lastKey.isEmpty()) {
+ qb.exclusiveStartKey(lastKey);
+ }
+
+ QueryResponse result = client.query(qb.build());
+ allItems.addAll(result.items());
+ lastKey = result.lastEvaluatedKey();
+ pageCount++;
+ } while (lastKey != null && !lastKey.isEmpty());
+ LOGGER.info("Page Count : {}", pageCount);
+ return allItems;
+ }
+}