This is an automated email from the ASF dual-hosted git repository.

virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b479c9  Using Phoenix RVC for Index Query Pagination
9b479c9 is described below

commit 9b479c99644f6ffee66f559d7332fa9caa45850a
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Feb 19 20:54:44 2026 -0800

    Using Phoenix RVC for Index Query Pagination
---
 .../apache/phoenix/ddb/service/QueryService.java   |  46 ++++-
 .../apache/phoenix/ddb/service/utils/DQLUtils.java |  58 ++++--
 .../org/apache/phoenix/ddb/BinaryEndToEndIT.java   |  59 ++++++
 .../phoenix/ddb/QueryIndexExclusiveStartKeyIT.java | 228 +++++++++++++++++++++
 4 files changed, 369 insertions(+), 22 deletions(-)

diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
index 13bc8e2..9325737 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/QueryService.java
@@ -127,7 +127,6 @@ public class QueryService {
         } catch (RuntimeException e) {
             throw new ValidationException(e.getMessage());
         }
-        PColumn sortKeyPKCol = keyConditions.getSortKeyPKCol();
 
         // append all conditions for WHERE clause
         // TODO: Validate exclusiveStartKey against sortKey range in key 
condition expression
@@ -135,7 +134,7 @@ public class QueryService {
         boolean scanIndexForward = doScanIndexForward(request);
         DQLUtils.addExclusiveStartKeyConditionForQuery(queryBuilder,
                 (Map<String, Object>) 
request.get(ApiMetadata.EXCLUSIVE_START_KEY), useIndex,
-                sortKeyPKCol, scanIndexForward);
+                scanIndexForward, tablePKCols, indexPKCols);
         DQLUtils.addFilterCondition(true, queryBuilder, (String) 
request.get(ApiMetadata.FILTER_EXPRESSION),
                 exprAttrNames, exprAttrValues);
         addOrderByClause(queryBuilder, useIndex, tablePKCols, indexPKCols, 
scanIndexForward);
@@ -144,8 +143,8 @@ public class QueryService {
 
         // Set values on the PreparedStatement
         PreparedStatement stmt = 
conn.prepareStatement(queryBuilder.toString());
-        setPreparedStatementValues(stmt, request, keyConditions, useIndex, 
sortKeyPKCol);
-        return Pair.newPair(stmt, sortKeyPKCol == null);
+        setPreparedStatementValues(stmt, request, keyConditions, useIndex, 
tablePKCols, indexPKCols);
+        return Pair.newPair(stmt, !useIndex && tablePKCols.size() == 1);
     }
 
     private static void addOrderByClause(StringBuilder queryBuilder, boolean 
useIndex,
@@ -162,6 +161,13 @@ public class QueryService {
             queryBuilder.append(", ").append(sortKeyName)
                     .append(scanIndexForward ? " ASC " : " DESC ");
         }
+        if (useIndex) {
+            for (PColumn tablePkCol : tablePKCols) {
+                String pkName = 
CommonServiceUtils.getEscapedArgument(tablePkCol.getName().toString());
+                queryBuilder.append(", 
").append(pkName).append(scanIndexForward ? " ASC " : " DESC ");
+            }
+        }
+        queryBuilder.append(" ");
     }
 
     /**
@@ -172,15 +178,19 @@ public class QueryService {
      */
     private static void setPreparedStatementValues(PreparedStatement stmt,
             Map<String, Object> request, KeyConditionsHolder keyConditions, 
boolean useIndex,
-            PColumn sortKeyPKCol) throws SQLException {
+            List<PColumn> tablePKCols, List<PColumn> indexPKCols) throws 
SQLException {
         int index = 1;
         Map<String, Object> exclusiveStartKey =
                 (Map<String, Object>) 
request.get(ApiMetadata.EXCLUSIVE_START_KEY);
         Map<String, Object> exprAttrVals =
                 (Map<String, Object>) 
request.get(ApiMetadata.EXPRESSION_ATTRIBUTE_VALUES);
+
+        // Bind partition key value
         Map<String, Object> partitionAttrVal =
                 (Map<String, Object>) 
exprAttrVals.get(keyConditions.getPartitionValue());
         DQLUtils.setKeyValueOnStatement(stmt, index++, partitionAttrVal, 
false);
+
+        // Bind sort key values from key condition expression
         if (keyConditions.hasSortKey()) {
             if (keyConditions.hasBeginsWith()) {
                 Map<String, Object> sortAttrVal = (Map<String, Object>) 
exprAttrVals.get(
@@ -198,10 +208,28 @@ public class QueryService {
                 }
             }
         }
-        if (exclusiveStartKey != null && !exclusiveStartKey.isEmpty() && 
sortKeyPKCol != null) {
-            String sortKeyName = 
CommonServiceUtils.getColumnNameFromPCol(sortKeyPKCol, useIndex);
-            DQLUtils.setKeyValueOnStatement(stmt, index,
-                    (Map<String, Object>) exclusiveStartKey.get(sortKeyName), 
false);
+        // Bind exclusive start key values for pagination
+        if (exclusiveStartKey != null && !exclusiveStartKey.isEmpty()) {
+            if (useIndex) {
+                // Index query: bind values matching RVC order (isk if 
present, then table PKs)
+                if (indexPKCols.size() > 1) {
+                    String iskName = 
CommonServiceUtils.getKeyNameFromBsonValueFunc(indexPKCols.get(1).getName().toString());
+                    DQLUtils.setKeyValueOnStatement(stmt, index++,
+                            (Map<String, Object>) 
exclusiveStartKey.get(iskName), false);
+                }
+                for (PColumn tablePkCol : tablePKCols) {
+                    String pkName = tablePkCol.getName().getString();
+                    DQLUtils.setKeyValueOnStatement(stmt, index++,
+                            (Map<String, Object>) 
exclusiveStartKey.get(pkName), false);
+                }
+            } else {
+                // Table query: only bind sort key if table has one
+                if (tablePKCols.size() > 1) {
+                    String skName = tablePKCols.get(1).getName().getString();
+                    DQLUtils.setKeyValueOnStatement(stmt, index++,
+                            (Map<String, Object>) 
exclusiveStartKey.get(skName), false);
+                }
+            }
         }
     }
 
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
index 447eba1..0c7987f 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/DQLUtils.java
@@ -29,6 +29,10 @@ import java.util.Properties;
 public class DQLUtils {
 
     public static final String SIZE_LIMIT_REACHED = "SizeLimitReached";
+    private static final String ISK_HK_RVC = "(%s, %s) %s (?, ?)";
+    private static final String ISK_HK_SK_RVC = "(%s, %s, %s) %s (?, ?, ?)";
+    private static final String HK_SK_RVC = "(%s, %s) %s (?, ?)";
+    private static final String HK_RVC = "(%s) %s (?)";
 
     /**
      * Execute the given PreparedStatement, collect all returned items with 
projected attributes
@@ -134,28 +138,29 @@ public class DQLUtils {
     }
 
     /**
-     * If table has a sortKey and the QueryRequest provides an 
ExclusiveStartKey,
-     * add the condition for the sortKey to the query. If the request provides 
an index,
-     * replace sortKey name with a BSON_VALUE expression.
-     * Return the sortKeyName here in case the QueryRequest's 
KeyConditionExpression
-     * did not have a condition on the sortKey.
+     * If table has a sortKey in its KeyConditionExpression and the 
QueryRequest provides an ExclusiveStartKey,
+     * add the condition for the sortKey to the query.
+     * If the request provides an index, replace sortKey name with a 
BSON_VALUE expression.
+     * For index, we also need to provide RVC condition for data table PKs.
      */
     public static void addExclusiveStartKeyConditionForQuery(StringBuilder 
queryBuilder,
             Map<String, Object> exclusiveStartKey, boolean useIndex,
-            PColumn sortKeyPKCol, boolean scanIndexForward) {
+            boolean scanIndexForward, List<PColumn> tablePKCols, List<PColumn> 
indexPKCols) {
         if (exclusiveStartKey != null && !exclusiveStartKey.isEmpty()) {
                 String op = " > ";
                 // when scanning backwards, flip the operator
                 if (!scanIndexForward) {
                     op = " < ";
                 }
-                if (sortKeyPKCol != null) {
-                    //append sortKey condition if there is a sortKey
-                    String name = sortKeyPKCol.getName().toString();
-                    name = (useIndex) ?
-                            name.substring(1) :
-                            CommonServiceUtils.getEscapedArgument(name);
-                    queryBuilder.append(" AND " + name + op + " ? ");
+                if (useIndex) {
+                    String rvcClause = getRVCClauseForIndexQuery(op, 
tablePKCols, indexPKCols);
+                    queryBuilder.append(" AND ").append(rvcClause);
+                } else {
+                    if (tablePKCols.size() == 2) {
+                        String name = tablePKCols.get(1).getName().toString();
+                        name = CommonServiceUtils.getEscapedArgument(name);
+                        queryBuilder.append(" AND " + name + op + " ? ");
+                    }
                 }
         }
     }
@@ -207,4 +212,31 @@ public class DQLUtils {
 
         }
     }
+
+    private static String getRVCClauseForIndexQuery(String op, List<PColumn> 
tablePKCols,
+                                                    List<PColumn> indexPKCols) 
{
+        String hk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(0).getName().toString());
+        if (indexPKCols.size() == 1) {
+            // Index has only ihk
+            if (tablePKCols.size() == 1) {
+                // (hk) > (?)
+                return String.format(HK_RVC, hk, op);
+            } else {
+                // (hk, sk) > (?, ?)
+                String sk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+                return String.format(HK_SK_RVC, hk, sk, op);
+            }
+        } else {
+            // Index has ihk, isk
+            String isk = 
CommonServiceUtils.getColumnExprFromPCol(indexPKCols.get(1), true);
+            if (tablePKCols.size() == 1) {
+                // (isk, hk) > (?, ?)
+                return String.format(ISK_HK_RVC, isk, hk, op);
+            } else {
+                // (isk, hk, sk) > (?, ?, ?)
+                String sk = 
CommonServiceUtils.getEscapedArgument(tablePKCols.get(1).getName().toString());
+                return String.format(ISK_HK_SK_RVC, isk, hk, sk, op);
+            }
+        }
+    }
 }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
index 4d36535..ebcb96d 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
@@ -766,6 +766,65 @@ public class BinaryEndToEndIT {
         TestUtils.compareQueryOutputs(qr, phoenixDBClientV2, dynamoDbClient);
     }
 
+    @Test
+    public void queryIndexWithEqualsSortKeyAndPagination() {
+        // Common GSI key values (like Phoenix test: GSI_HK and GSI_SK are the 
same for all rows)
+        byte[] gsiHk = new byte[]{0x00, 0x00, 0x00, 0x0A};  // GSI hash key
+        byte[] gsiSk = "1".getBytes();                       // GSI sort key = 
"1"
+
+        // Row 1: HK=[0x0B,0x01], SK="1" - ExclusiveStartKey row, should be 
EXCLUDED
+        byte[] hk1 = new byte[]{0x0B, 0x01};
+        byte[] sk1 = "1".getBytes();
+        Map<String, AttributeValue> item1 = new HashMap<>();
+        item1.put("hk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(hk1)).build());
+        item1.put("sk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(sk1)).build());
+        item1.put("index_hk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+        item1.put("index_sk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+        item1.put("data", AttributeValue.builder().s("row1").build());
+
+        // Row 2: HK=[0x0B,0x01], SK="2" - should be INCLUDED (SK "2" > "1")
+        byte[] hk2 = new byte[]{0x0B, 0x01};
+        byte[] sk2 = "2".getBytes();
+        Map<String, AttributeValue> item2 = new HashMap<>();
+        item2.put("hk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(hk2)).build());
+        item2.put("sk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(sk2)).build());
+        item2.put("index_hk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+        item2.put("index_sk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+        item2.put("data", AttributeValue.builder().s("row2").build());
+
+        // Row 3: HK=[0x0B,0x02], SK="1" - should be INCLUDED (HK [0x0B,0x02] 
> [0x0B,0x01])
+        byte[] hk3 = new byte[]{0x0B, 0x02};
+        byte[] sk3 = "1".getBytes();
+        Map<String, AttributeValue> item3 = new HashMap<>();
+        item3.put("hk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(hk3)).build());
+        item3.put("sk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(sk3)).build());
+        item3.put("index_hk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+        item3.put("index_sk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+        item3.put("data", AttributeValue.builder().s("row3").build());
+
+        // Insert all rows
+        
dynamoDbClient.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item1).build());
+        
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item1).build());
+        
dynamoDbClient.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item2).build());
+        
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item2).build());
+        
dynamoDbClient.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item3).build());
+        
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(TABLE_NAME).item(item3).build());
+
+        // Query GSI with index_hk = gsiHk AND index_sk = gsiSk (EQUALS on 
sort key)
+        Map<String, AttributeValue> exprVals = new HashMap<>();
+        exprVals.put(":indexHk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiHk)).build());
+        exprVals.put(":indexSk", 
AttributeValue.builder().b(SdkBytes.fromByteArray(gsiSk)).build());
+
+        QueryRequest.Builder qr = QueryRequest.builder()
+                .tableName(TABLE_NAME)
+                .indexName(INDEX_NAME)
+                .keyConditionExpression("index_hk = :indexHk AND index_sk = 
:indexSk")
+                .limit(1)
+                .expressionAttributeValues(exprVals);
+
+        TestUtils.compareQueryOutputs(qr, phoenixDBClientV2, dynamoDbClient);
+    }
+
     @Test
     public void scanTableWithFilterProjectionAndPagination() {
         List<Map<String, AttributeValue>> items = new ArrayList<>();
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/QueryIndexExclusiveStartKeyIT.java
 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/QueryIndexExclusiveStartKeyIT.java
new file mode 100644
index 0000000..650dc81
--- /dev/null
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/QueryIndexExclusiveStartKeyIT.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.ddb.rest.RESTServer;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.ServerUtil;
+
+import static org.apache.phoenix.ddb.TestUtils.verifyItemsEqual;
+import static org.junit.Assert.assertTrue;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+
+@RunWith(Parameterized.class)
+public class QueryIndexExclusiveStartKeyIT {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryIndexExclusiveStartKeyIT.class);
+    private final DynamoDbClient dynamoDbClient =
+            LocalDynamoDbTestBase.localDynamoDb().createV2Client();
+    private static DynamoDbClient phoenixDBClientV2;
+    private static HBaseTestingUtility utility;
+    private static RESTServer restServer;
+    private static String tmpDir;
+
+    private final boolean tableHasSK;
+    private final boolean indexHasSK;
+    private final int limit;
+
+    @Parameterized.Parameters(name = "tableSK={0}_indexSK={1}_limit={2}")
+    public static Collection<Object[]> data() {
+        List<Object[]> params = new ArrayList<>();
+        boolean[] bools = {false, true};
+        int[] limits = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 15, 19, 23, 37, 
43, 49};
+        for (boolean tableSK : bools) {
+            for (boolean indexSK : bools) {
+                for (int limit : limits) {
+                    params.add(new Object[]{tableSK, indexSK, limit});
+                }
+            }
+        }
+        return params;
+    }
+
+    public QueryIndexExclusiveStartKeyIT(boolean tableHasSK, boolean 
indexHasSK, int limit) {
+        this.tableHasSK = tableHasSK;
+        this.indexHasSK = indexHasSK;
+        this.limit = limit;
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        tmpDir = System.getProperty("java.io.tmpdir");
+        LocalDynamoDbTestBase.localDynamoDb().start();
+        Configuration conf = HBaseConfiguration.create();
+        utility = new HBaseTestingUtility(conf);
+        setUpConfigForMiniCluster(conf);
+        utility.startMiniCluster();
+        restServer = new RESTServer(utility.getConfiguration());
+        restServer.run();
+        phoenixDBClientV2 = LocalDynamoDB.createV2Client("http://"; + 
restServer.getServerAddress());
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        LocalDynamoDbTestBase.localDynamoDb().stop();
+        if (restServer != null) restServer.stop();
+        ServerUtil.ConnectionFactory.shutdown();
+        try {
+            DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        } finally {
+            if (utility != null) utility.shutdownMiniCluster();
+            ServerMetadataCacheTestImpl.resetCache();
+        }
+        System.setProperty("java.io.tmpdir", tmpDir);
+    }
+
+    @Test(timeout = 18000)
+    public void testIndexQueryPagination() {
+        String tableName = generateUniqueName() + "_" + tableHasSK + "_" + 
indexHasSK + "_" + limit;
+        String indexName = "GSI_" + generateUniqueName();
+
+        createTableAndIndex(tableName, indexName);
+        insertItems(tableName, 200);
+
+        // Forward scan
+        List<Map<String, AttributeValue>> phoenixItems = 
queryAllPages(phoenixDBClientV2, tableName, indexName, true);
+        List<Map<String, AttributeValue>> dynamoItems = 
queryAllPages(dynamoDbClient, tableName, indexName, true);
+        verifyItemsEqual(dynamoItems, phoenixItems, "hk", tableHasSK ? "sk" : 
null);
+
+
+        // Reverse scan
+        List<Map<String, AttributeValue>> phoenixItemsRev = 
queryAllPages(phoenixDBClientV2, tableName, indexName, false);
+        for (int i = 1; i < phoenixItemsRev.size(); i++) {
+            Map<String, AttributeValue> prev = phoenixItemsRev.get(i - 1);
+            Map<String, AttributeValue> curr = phoenixItemsRev.get(i);
+            int cmp = 0;
+            if (indexHasSK && cmp == 0) cmp = Double.compare(
+                    Double.parseDouble(curr.get("isk").n()),
+                    Double.parseDouble(prev.get("isk").n()));
+            if (cmp == 0) cmp = 
curr.get("hk").s().compareTo(prev.get("hk").s());
+            if (tableHasSK && cmp == 0) cmp = 
curr.get("sk").s().compareTo(prev.get("sk").s());
+            assertTrue("Reverse ordering violated at index " + i + ": prev=" + 
prev + ", curr=" + curr, cmp <= 0);
+        }
+        List<Map<String, AttributeValue>> dynamoItemsRev = 
queryAllPages(dynamoDbClient, tableName, indexName, false);
+        verifyItemsEqual(dynamoItemsRev, phoenixItemsRev, "hk", tableHasSK ? 
"sk" : null);
+    }
+
+    private void createTableAndIndex(String tableName, String indexName) {
+        CreateTableRequest req = DDLTestUtils.getCreateTableRequest(
+                tableName, "hk", ScalarAttributeType.S,
+                tableHasSK ? "sk" : null, tableHasSK ? ScalarAttributeType.S : 
null);
+
+        req = DDLTestUtils.addIndexToRequest(true, req, indexName,
+                "ihk", ScalarAttributeType.S,
+                indexHasSK ? "isk" : null, indexHasSK ? ScalarAttributeType.N 
: null);
+
+        phoenixDBClientV2.createTable(req);
+        dynamoDbClient.createTable(req);
+    }
+
+    private void insertItems(String tableName, int count) {
+        for (int i = 1; i <= count; i++) {
+            Map<String, AttributeValue> item = new HashMap<>();
+            // Create duplicates: groups of 5 share same hk when table has sk
+            String hkVal = tableHasSK ? "hk" + (i / 5) : "hk" + i;
+            item.put("hk", AttributeValue.builder().s(hkVal).build());
+            if (tableHasSK) {
+                item.put("sk", AttributeValue.builder().s("sk" + 
String.format("%03d", i)).build());
+            }
+            // Create duplicates: 2 distinct ihk values
+            item.put("ihk", AttributeValue.builder().s("ihk" + (i % 
2)).build());
+            if (indexHasSK) {
+                // Create duplicates: groups of 3 share same isk
+                item.put("isk", AttributeValue.builder().n(String.valueOf((i / 
3) * 10)).build());
+            }
+            item.put("data", AttributeValue.builder().s("data" + i).build());
+
+            PutItemRequest put = 
PutItemRequest.builder().tableName(tableName).item(item).build();
+            phoenixDBClientV2.putItem(put);
+            dynamoDbClient.putItem(put);
+        }
+    }
+
+    private List<Map<String, AttributeValue>> queryAllPages(DynamoDbClient 
client,
+                                                            String tableName, 
String indexName,
+                                                            boolean 
scanIndexForward) {
+        Map<String, String> names = new HashMap<>();
+        names.put("#ihk", "ihk");
+
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":ihk", AttributeValue.builder().s("ihk1").build());
+
+        String keyCondExpr = "#ihk = :ihk";
+        if (indexHasSK) {
+            names.put("#isk", "isk");
+            values.put(":isk", AttributeValue.builder().n("500").build());
+            keyCondExpr = "#ihk = :ihk AND #isk <= :isk";
+        }
+
+        List<Map<String, AttributeValue>> allItems = new ArrayList<>();
+        Map<String, AttributeValue> lastKey = null;
+
+        int pageCount = 0;
+        do {
+            QueryRequest.Builder qb = QueryRequest.builder()
+                    .tableName(tableName)
+                    .indexName(indexName)
+                    .keyConditionExpression(keyCondExpr)
+                    .expressionAttributeNames(names)
+                    .expressionAttributeValues(values)
+                    .scanIndexForward(scanIndexForward)
+                    .limit(limit);
+
+            if (lastKey != null && !lastKey.isEmpty()) {
+                qb.exclusiveStartKey(lastKey);
+            }
+
+            QueryResponse result = client.query(qb.build());
+            allItems.addAll(result.items());
+            lastKey = result.lastEvaluatedKey();
+            pageCount++;
+        } while (lastKey != null && !lastKey.isEmpty());
+        LOGGER.info("Page Count : {}", pageCount);
+        return allItems;
+    }
+}

Reply via email to