This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 975e3f2 Support segment scan on indexes
975e3f2 is described below
commit 975e3f25602de9ed488f0ed2c34766ca92a74598
Author: Palash Chauhan <[email protected]>
AuthorDate: Fri Mar 20 16:11:08 2026 -0700
Support segment scan on indexes
---
.../apache/phoenix/ddb/service/ScanService.java | 29 +--
.../phoenix/ddb/service/utils/SegmentScanUtil.java | 32 +++-
.../phoenix/ddb/service/utils/ValidationUtil.java | 2 +-
.../org/apache/phoenix/ddb/BaseSegmentScanIT.java | 31 +++-
.../org/apache/phoenix/ddb/IndexSegmentScanIT.java | 205 +++++++++++++++++++++
.../java/org/apache/phoenix/ddb/ScanIndexIT.java | 48 -----
6 files changed, 266 insertions(+), 81 deletions(-)
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
index 63fc4f0..2bbdb3f 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ScanService.java
@@ -60,11 +60,6 @@ public class ScanService {
handleLegacyParamsConversion(request);
CommonServiceUtils.handleLegacyProjectionConversion(request);
- // Segment Scan on indexes is not yet supported - we will return all
items for segment 0.
- if (isSegmentScanRequestOnIndex(request) && (Integer)
request.get(ApiMetadata.SEGMENT) > 0) {
- return buildEmptyScanResponse(request);
- }
-
try (Connection connection =
ConnectionUtil.getConnection(connectionUrl)) {
return executeScan(connection, request);
} catch (SQLException e) {
@@ -94,7 +89,7 @@ public class ScanService {
);
// Set segment info if this is a segment scan
- if (isSegmentScanRequestOnTable(request)) {
+ if (isSegmentScanRequest(request)) {
ScanSegmentInfo segmentInfo = getSegmentInfo(connection, request);
// Return empty result if segment doesn't exist
if (segmentInfo == null || segmentInfo.isEmptySegment()) {
@@ -324,19 +319,9 @@ public class ScanService {
/**
* Check if the request is for a segment scan
*/
- public static boolean isSegmentScanRequestOnTable(Map<String, Object>
request) {
+ public static boolean isSegmentScanRequest(Map<String, Object> request) {
return request.get(ApiMetadata.SEGMENT) != null
- && request.get(ApiMetadata.TOTAL_SEGMENTS) != null
- &&
StringUtils.isEmpty((String)request.get(ApiMetadata.INDEX_NAME));
- }
-
- /**
- * Check if the request is for a segment scan
- */
- public static boolean isSegmentScanRequestOnIndex(Map<String, Object>
request) {
- return request.get(ApiMetadata.SEGMENT) != null
- && request.get(ApiMetadata.TOTAL_SEGMENTS) != null
- &&
!StringUtils.isEmpty((String)request.get(ApiMetadata.INDEX_NAME));
+ && request.get(ApiMetadata.TOTAL_SEGMENTS) != null;
}
/**
@@ -347,6 +332,7 @@ public class ScanService {
Integer segment = (Integer) request.get(ApiMetadata.SEGMENT);
Integer totalSegments = (Integer)
request.get(ApiMetadata.TOTAL_SEGMENTS);
String tableName = (String) request.get(ApiMetadata.TABLE_NAME);
+ String indexName = (String) request.get(ApiMetadata.INDEX_NAME);
Map<String, Object> exclusiveStartKey =
(Map<String, Object>)
request.get(ApiMetadata.EXCLUSIVE_START_KEY);
@@ -354,11 +340,10 @@ public class ScanService {
if (exclusiveStartKey == null || exclusiveStartKey.isEmpty()) {
// First page - generate and get segment boundaries
return SegmentScanUtil.updateAndGetSegmentScanRange(connection,
tableName,
- totalSegments, segment);
+ indexName, totalSegments, segment);
} else {
- // Subsequent page - boundaries should already exist
- return SegmentScanUtil.getSegmentScanRange(connection, tableName,
totalSegments,
- segment);
+ return SegmentScanUtil.getSegmentScanRange(connection, tableName,
indexName,
+ totalSegments, segment);
}
}
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/SegmentScanUtil.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/SegmentScanUtil.java
index d953336..a7f2e2d 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/SegmentScanUtil.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/SegmentScanUtil.java
@@ -9,6 +9,7 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,19 +60,20 @@ public class SegmentScanUtil {
* @return
*/
public static ScanSegmentInfo getSegmentScanRange(Connection connection,
String tableName,
- int totalSegments, int segmentNumber) throws SQLException {
+ String indexName, int totalSegments, int segmentNumber) throws
SQLException {
+ String metadataKey = resolvePhysicalTableName(tableName, indexName);
PreparedStatement pstmt =
connection.prepareStatement(SEGMENT_SCAN_RANGE_METADATA_QUERY);
- pstmt.setString(1, tableName);
+ pstmt.setString(1, metadataKey);
pstmt.setInt(2, totalSegments);
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
String[] startKeys = (String[]) rs.getArray(1).getArray();
String[] endKeys = (String[]) rs.getArray(2).getArray();
- return extractSegmentBoundaryFromArray(startKeys, endKeys,
tableName, totalSegments,
+ return extractSegmentBoundaryFromArray(startKeys, endKeys,
metadataKey, totalSegments,
segmentNumber);
} else {
- String err = "No segment scan ranges found for table: " + tableName
+ String err = "No segment scan ranges found for table: " +
metadataKey
+ ", total segments: " + totalSegments;
LOGGER.error(err);
throw new SQLException(err);
@@ -90,15 +92,16 @@ public class SegmentScanUtil {
* @return
*/
public static ScanSegmentInfo updateAndGetSegmentScanRange(Connection
connection, String tableName,
- int totalSegments, int segmentNumber) throws SQLException {
+ String indexName, int totalSegments, int segmentNumber) throws
SQLException {
connection.setAutoCommit(true);
+ String physicalTableName = resolvePhysicalTableName(tableName,
indexName);
Pair<String[], String[]> segmentStartKeysAndEndKeys =
- executeTotalSegmentsQuery(connection, tableName,
totalSegments);
+ executeTotalSegmentsQuery(connection, physicalTableName,
totalSegments);
String[] segmentStartKeys = segmentStartKeysAndEndKeys.getFirst();
String[] segmentEndKeys = segmentStartKeysAndEndKeys.getSecond();
PreparedStatement pstmt =
connection.prepareStatement(SEGMENT_SCAN_RANGE_METADATA_UPDATE);
- pstmt.setString(1, tableName);
+ pstmt.setString(1, physicalTableName);
pstmt.setInt(2, totalSegments);
pstmt.setArray(3, connection.createArrayOf("VARCHAR",
segmentStartKeys));
pstmt.setArray(4, connection.createArrayOf("VARCHAR", segmentEndKeys));
@@ -109,11 +112,11 @@ public class SegmentScanUtil {
if (rs == null) {
throw new SQLException(
"No segment scan ranges were returned after metadata
update for table: "
- + tableName + ", total segments: " +
totalSegments);
+ + physicalTableName + ", total segments: " +
totalSegments);
}
String[] startKeys = (String[]) rs.getArray(3).getArray();
String[] endKeys = (String[]) rs.getArray(4).getArray();
- return extractSegmentBoundaryFromArray(startKeys, endKeys, tableName,
totalSegments,
+ return extractSegmentBoundaryFromArray(startKeys, endKeys,
physicalTableName, totalSegments,
segmentNumber);
}
@@ -145,6 +148,17 @@ public class SegmentScanUtil {
segmentEndKeys.toArray(new String[0]));
}
+ /**
+ * For index scans, the physical table is the index table
(tableName_indexName).
+ * For table scans, it is simply the table name.
+ */
+ private static String resolvePhysicalTableName(String tableName, String
indexName) {
+ if (StringUtils.isEmpty(indexName)) {
+ return tableName;
+ }
+ return PhoenixUtils.getInternalIndexName(tableName, indexName);
+ }
+
/**
* Extract the start and end keys byte array for the given segment number
from the given String arrays.
*/
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
index b3e219e..faba22d 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
@@ -135,7 +135,7 @@ public class ValidationUtil {
}
public static void validateScanRequest(Map<String, Object> request) {
- if (ScanService.isSegmentScanRequestOnTable(request)) {
+ if (ScanService.isSegmentScanRequest(request)) {
Integer segment = (Integer) request.get(ApiMetadata.SEGMENT);
Integer totalSegments = (Integer)
request.get(ApiMetadata.TOTAL_SEGMENTS);
if (segment < 0) {
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
index 7e514fc..13b9b68 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
@@ -263,12 +263,23 @@ public abstract class BaseSegmentScanIT {
*/
protected List<Map<String, AttributeValue>>
performFullScanWithPagination(DynamoDbClient client,
String tableName, boolean useFilter, int filterNum, int scanLimit)
{
+ return performFullScanWithPagination(client, tableName, null,
useFilter, filterNum, scanLimit);
+ }
+
+ /**
+ * Perform a full scan with pagination, optionally on an index.
+ */
+ protected List<Map<String, AttributeValue>>
performFullScanWithPagination(DynamoDbClient client,
+ String tableName, String indexName, boolean useFilter, int
filterNum, int scanLimit) {
List<Map<String, AttributeValue>> allItems = new ArrayList<>();
Map<String, AttributeValue> lastEvaluatedKey = null;
do {
ScanRequest.Builder scanBuilder = ScanRequest.builder()
.tableName(tableName)
.limit(scanLimit);
+ if (indexName != null) {
+ scanBuilder.indexName(indexName);
+ }
if (useFilter) {
scanBuilder.filterExpression(getFilterExpression(filterNum));
Map<String, String> attrNames =
getFilterAttributeNames(filterNum);
@@ -322,7 +333,7 @@ public abstract class BaseSegmentScanIT {
/**
* Execute batch write for both Phoenix and DynamoDB clients.
*/
- private void executeBatchWrite(String tableName, List<WriteRequest> batch)
{
+ protected void executeBatchWrite(String tableName, List<WriteRequest>
batch) {
Map<String, List<WriteRequest>> requestItems = new HashMap<>();
requestItems.put(tableName, new ArrayList<>(batch));
BatchWriteItemRequest batchRequest =
@@ -342,6 +353,21 @@ public abstract class BaseSegmentScanIT {
boolean addDelay,
boolean useFilter,
int filterNum) {
+ return scanSingleSegmentWithPagination(tableName, null, segment,
totalSegments,
+ scanLimit, addDelay, useFilter, filterNum);
+ }
+
+ /**
+ * Scan a single segment with pagination, optionally on an index.
+ */
+ protected List<Map<String, AttributeValue>>
scanSingleSegmentWithPagination(String tableName,
+
String indexName,
+
int segment,
+
int totalSegments,
+
int scanLimit,
+
boolean addDelay,
+
boolean useFilter,
+
int filterNum) {
List<Map<String, AttributeValue>> segmentItems = new ArrayList<>();
Map<String, AttributeValue> lastEvaluatedKey = null;
@@ -351,6 +377,9 @@ public abstract class BaseSegmentScanIT {
.segment(segment)
.totalSegments(totalSegments)
.limit(scanLimit);
+ if (indexName != null) {
+ scanBuilder.indexName(indexName);
+ }
if (useFilter) {
scanBuilder.filterExpression(getFilterExpression(filterNum));
Map<String, String> attrNames =
getFilterAttributeNames(filterNum);
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/IndexSegmentScanIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/IndexSegmentScanIT.java
new file mode 100644
index 0000000..11dfaee
--- /dev/null
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/IndexSegmentScanIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.ddb.utils.PhoenixUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+/**
+ * Segment scan tests for indexes.
+ * Verifies that segment scans on indexes produce the same results as a full
index scan.
+ */
+@RunWith(Parameterized.class)
+public class IndexSegmentScanIT extends BaseSegmentScanIT {
+
+ private final String hashKeyName;
+ private final ScalarAttributeType hashKeyType;
+ private final String sortKeyName;
+ private final ScalarAttributeType sortKeyType;
+ private final String indexHashKeyName;
+ private final ScalarAttributeType indexHashKeyType;
+ private final String indexSortKeyName;
+ private final ScalarAttributeType indexSortKeyType;
+ private final int totalSegments;
+ private final boolean useFilter;
+ private final int filterNum;
+
+ protected static final int TOTAL_ITEMS = 3000;
+ protected static final int SPLIT_FREQUENCY = 600;
+
+ public IndexSegmentScanIT(String hashKeyName, ScalarAttributeType
hashKeyType,
+ String sortKeyName, ScalarAttributeType sortKeyType,
+ String indexHashKeyName, ScalarAttributeType indexHashKeyType,
+ String indexSortKeyName, ScalarAttributeType indexSortKeyType,
+ boolean useFilter) {
+ this.hashKeyName = hashKeyName;
+ this.hashKeyType = hashKeyType;
+ this.sortKeyName = sortKeyName;
+ this.sortKeyType = sortKeyType;
+ this.indexHashKeyName = indexHashKeyName;
+ this.indexHashKeyType = indexHashKeyType;
+ this.indexSortKeyName = indexSortKeyName;
+ this.indexSortKeyType = indexSortKeyType;
+ Random random = new Random();
+ this.totalSegments = random.nextInt(6) + 2;
+ this.useFilter = useFilter;
+ this.filterNum = useFilter ? random.nextInt(15) + 1 : 0;
+ }
+
+ @Parameterized.Parameters(name =
"TblH_{1}_TblS_{3}_IdxH_{5}_IdxS_{7}_Filter_{8}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ // HK-only table, IHK-only index
+ {"pk", ScalarAttributeType.S, null, null,
+ "status", ScalarAttributeType.S, null, null, false},
+ {"pk", ScalarAttributeType.N, null, null,
+ "category", ScalarAttributeType.S, null, null, true},
+
+ // HK+SK table, IHK-only index
+ {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N,
+ "status", ScalarAttributeType.S, null, null, true},
+ {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S,
+ "category", ScalarAttributeType.S, null, null, false},
+
+ // HK-only table, IHK+ISK index
+ {"pk", ScalarAttributeType.S, null, null,
+ "status", ScalarAttributeType.S, "priority",
ScalarAttributeType.N, false},
+ {"pk", ScalarAttributeType.N, null, null,
+ "category", ScalarAttributeType.S, "amount",
ScalarAttributeType.N, true},
+
+ // HK+SK table, IHK+ISK index
+ {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N,
+ "status", ScalarAttributeType.S, "priority",
ScalarAttributeType.N, true},
+ {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S,
+ "category", ScalarAttributeType.S, "amount",
ScalarAttributeType.N, false},
+
+ // Binary table keys with index
+ {"pk", ScalarAttributeType.B, null, null,
+ "status", ScalarAttributeType.S, null, null, true},
+ {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.B,
+ "category", ScalarAttributeType.S, "priority",
ScalarAttributeType.N, false},
+ });
+ }
+
+ // Split points that divide the index key space for String index hash keys.
+ // Works for both "status" (active/cancelled/completed/failed/pending)
+ // and "category" (books/clothing/electronics/food/sports/toys) values.
+ private static final String[] INDEX_SPLIT_POINTS = {"c", "f", "p", "s"};
+
+ @Test(timeout = 600000)
+ public void testIndexSegmentScan() throws Exception {
+ final String tableName =
testName.getMethodName().replaceAll("[\\[\\]]", "_")
+ + "_" + generateRandomString(5);
+ final String indexName = "idx_" + generateRandomString(5);
+
+ CreateTableRequest createTableRequest =
DDLTestUtils.getCreateTableRequest(
+ tableName, hashKeyName, hashKeyType, sortKeyName, sortKeyType);
+ createTableRequest = DDLTestUtils.addIndexToRequest(true,
createTableRequest, indexName,
+ indexHashKeyName, indexHashKeyType, indexSortKeyName,
indexSortKeyType);
+ phoenixDBClientV2.createTable(createTableRequest);
+ dynamoDbClient.createTable(createTableRequest);
+
+ insertItemsAndSplitIndex(tableName, indexName, TOTAL_ITEMS,
SPLIT_FREQUENCY, 200);
+
+ Random random = new Random();
+
+ // Full scan on index - both Phoenix and DDB
+ List<Map<String, AttributeValue>> fullScanItemsPhoenix =
+ performFullScanWithPagination(phoenixDBClientV2, tableName,
indexName,
+ useFilter, filterNum, random.nextInt(150) + 1);
+ List<Map<String, AttributeValue>> fullScanItemsDDB =
+ performFullScanWithPagination(dynamoDbClient, tableName,
indexName,
+ useFilter, filterNum, random.nextInt(200) + 1);
+
+ // Segment scan on index
+ List<Map<String, AttributeValue>> segmentScanItems = new ArrayList<>();
+ for (int segment = 0; segment < totalSegments; segment++) {
+ int scanLimit = random.nextInt(200) + 1;
+ List<Map<String, AttributeValue>> segmentItems =
+ scanSingleSegmentWithPagination(tableName, indexName,
segment, totalSegments,
+ scanLimit, false, useFilter, filterNum);
+ segmentScanItems.addAll(segmentItems);
+ }
+
+ TestUtils.verifyItemsEqual(fullScanItemsDDB, fullScanItemsPhoenix,
hashKeyName, sortKeyName);
+ TestUtils.verifyItemsEqual(fullScanItemsPhoenix, segmentScanItems,
hashKeyName, sortKeyName);
+ }
+
+ /**
+ * Insert items and periodically split the index table (not the data
table).
+ */
+ private void insertItemsAndSplitIndex(String tableName, String indexName,
+ int totalItems, int splitFrequency, int sleepMillis) throws
Exception {
+ int splitCount = 0;
+ List<WriteRequest> batch = new ArrayList<>();
+ for (int i = 0; i < totalItems; i++) {
+ Map<String, AttributeValue> item =
+ createTestItem(i, hashKeyName, hashKeyType, sortKeyName,
sortKeyType);
+ batch.add(WriteRequest.builder()
+
.putRequest(PutRequest.builder().item(item).build()).build());
+ boolean shouldFlush = batch.size() >= 25 || (i > 0 && (i + 1) %
splitFrequency == 0)
+ || i == totalItems - 1;
+ if (shouldFlush) {
+ executeBatchWrite(tableName, batch);
+ batch.clear();
+ }
+ if (i > 0 && i % splitFrequency == 0 && splitCount <
INDEX_SPLIT_POINTS.length) {
+ splitIndexTable(tableName, indexName, splitCount++);
+ Thread.sleep(sleepMillis);
+ }
+ }
+ }
+
+ private void splitIndexTable(String tableName, String indexName, int
splitIndex) {
+ try {
+ byte[] splitPoint = Bytes.toBytes(INDEX_SPLIT_POINTS[splitIndex]);
+ String fullIndexTableName = PhoenixUtils.getFullTableName(
+ PhoenixUtils.getInternalIndexName(tableName, indexName),
false);
+ TestUtils.splitTable(testConnection, fullIndexTableName,
splitPoint);
+ LOGGER.info("Split index table {} at '{}'", indexName,
+ INDEX_SPLIT_POINTS[splitIndex]);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to split index table: {}", e.getMessage());
+ }
+ }
+
+ private String generateRandomString(int length) {
+ String chars = "abcdefghijklmnopqrstuvwxyz0123456789";
+ Random random = new Random();
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append(chars.charAt(random.nextInt(chars.length())));
+ }
+ return sb.toString();
+ }
+}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
index be9edbc..7777a59 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndexIT.java
@@ -19,9 +19,7 @@ package org.apache.phoenix.ddb;
import java.sql.DriverManager;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
@@ -434,52 +432,6 @@ public class ScanIndexIT {
Assert.assertEquals(4, paginationCount);
}
- @Test(timeout = 120000)
- public void testScanIndexWithSegments() {
- //create table
- final String tableName = testName.getMethodName();
- final String indexName = "g_IDX" + tableName;
- CreateTableRequest createTableRequest =
- DDLTestUtils.getCreateTableRequest(tableName,
"DataGrave-_-Obscure_Dream_.404",
- ScalarAttributeType.S,
".-_.-_AnOtHeR_We1rD-Attr_9_9_9_._.-_.-",
- ScalarAttributeType.N);
-
- createTableRequest =
- DDLTestUtils.addIndexToRequest(true, createTableRequest,
indexName, "title",
- ScalarAttributeType.S, null, null);
- phoenixDBClientV2.createTable(createTableRequest);
- dynamoDbClient.createTable(createTableRequest);
-
- //put
- PutItemRequest putItemRequest1 =
PutItemRequest.builder().tableName(tableName).item(getItem1()).build();
- PutItemRequest putItemRequest2 =
PutItemRequest.builder().tableName(tableName).item(getItem2()).build();
- PutItemRequest putItemRequest3 =
PutItemRequest.builder().tableName(tableName).item(getItem3()).build();
- PutItemRequest putItemRequest4 =
PutItemRequest.builder().tableName(tableName).item(getItem4()).build();
- phoenixDBClientV2.putItem(putItemRequest1);
- phoenixDBClientV2.putItem(putItemRequest2);
- phoenixDBClientV2.putItem(putItemRequest3);
- phoenixDBClientV2.putItem(putItemRequest4);
- dynamoDbClient.putItem(putItemRequest1);
- dynamoDbClient.putItem(putItemRequest2);
- dynamoDbClient.putItem(putItemRequest3);
- dynamoDbClient.putItem(putItemRequest4);
-
- ScanRequest.Builder sr =
ScanRequest.builder().tableName(tableName).indexName(indexName).segment(0).totalSegments(2);
- ScanResponse phoenixResult0 = phoenixDBClientV2.scan(sr.build());
- List<Map<String, AttributeValue>> ddbItems0 =
dynamoDbClient.scan(sr.build()).items();
- Assert.assertEquals(4, phoenixResult0.items().size());
-
- sr = sr.segment(1);
- ScanResponse phoenixResult1 = phoenixDBClientV2.scan(sr.build());
- Assert.assertEquals(0, phoenixResult1.items().size());
-
- List<Map<String, AttributeValue>> ddbItems1 =
dynamoDbClient.scan(sr.build()).items();
- List<Map<String, AttributeValue>> ddbItems = new ArrayList<>();
- ddbItems.addAll(ddbItems0);
- ddbItems.addAll(ddbItems1);
- TestUtils.verifyItemsEqual(ddbItems, phoenixResult0.items(), "title",
null);
- }
-
/**
* RVC_1: Paginated scan on a table with hash key only (no sort key, no
index).
*/