This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 7185259 Test for UpdateTable Enable Stream + Describe Stream after
table splits
7185259 is described below
commit 7185259bcddf5e293065e318d26d971ac575c480
Author: Palash Chauhan <[email protected]>
AuthorDate: Wed Dec 24 14:11:42 2025 -0800
Test for UpdateTable Enable Stream + Describe Stream after table splits
---
.../org/apache/phoenix/ddb/DescribeStreamIT.java | 67 ++++++++++++++++++++++
1 file changed, 67 insertions(+)
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DescribeStreamIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DescribeStreamIT.java
index c10bdd0..2fde27f 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DescribeStreamIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DescribeStreamIT.java
@@ -43,14 +43,19 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest;
import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
+import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
+import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
+import software.amazon.awssdk.services.dynamodb.model.UpdateTableRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import java.net.HttpURLConnection;
@@ -349,4 +354,66 @@ public class DescribeStreamIT {
Assert.assertTrue("Timestamp should not be in the future", timestamp
<= now);
}
+
+ @Test(timeout = 120000)
+ public void testEnableStreamAfterSplits() throws Exception {
+ String tableName = testName.getMethodName();
+ CreateTableRequest createTableRequest =
+ DDLTestUtils.getCreateTableRequest(tableName, "pk",
+ ScalarAttributeType.S, "sk", ScalarAttributeType.N);
+
+ phoenixDBClientV2.createTable(createTableRequest);
+
+ String fullTableName = PhoenixUtils.getFullTableName(tableName, false);
+ try (Connection connection = DriverManager.getConnection(url)) {
+ TestUtils.splitTable(connection, fullTableName,
Bytes.toBytes("d"));
+ TestUtils.splitTable(connection, fullTableName,
Bytes.toBytes("p"));
+ TestUtils.splitTable(connection, fullTableName,
Bytes.toBytes("t"));
+ }
+
+ Map<String, AttributeValue> item1 = new HashMap<>();
+ item1.put("pk", AttributeValue.builder().s("a").build());
+ item1.put("sk", AttributeValue.builder().n("1").build());
+ Map<String, AttributeValue> item2 = new HashMap<>();
+ item2.put("pk", AttributeValue.builder().s("m").build());
+ item2.put("sk", AttributeValue.builder().n("2").build());
+ Map<String, AttributeValue> item3 = new HashMap<>();
+ item3.put("pk", AttributeValue.builder().s("z").build());
+ item3.put("sk", AttributeValue.builder().n("3").build());
+
+
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(tableName).item(item1).build());
+
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(tableName).item(item2).build());
+
phoenixDBClientV2.putItem(PutItemRequest.builder().tableName(tableName).item(item3).build());
+
+ UpdateTableRequest updateTableRequest = UpdateTableRequest.builder()
+ .tableName(tableName)
+ .streamSpecification(StreamSpecification.builder()
+ .streamEnabled(true)
+ .streamViewType(StreamViewType.NEW_AND_OLD_IMAGES)
+ .build())
+ .build();
+ phoenixDBClientV2.updateTable(updateTableRequest);
+
+ ListStreamsRequest lsr =
ListStreamsRequest.builder().tableName(tableName).build();
+ ListStreamsResponse phoenixStreams =
phoenixDBStreamsClientV2.listStreams(lsr);
+ Assert.assertEquals(1, phoenixStreams.streams().size());
+ String phoenixStreamArn = phoenixStreams.streams().get(0).streamArn();
+
+ TestUtils.waitForStream(phoenixDBStreamsClientV2, phoenixStreamArn);
+
+ List<Shard> phoenixShards = new ArrayList<>();
+ String lastEvaluatedShardId = null;
+ do {
+ StreamDescription phoenixStreamDesc =
phoenixDBStreamsClientV2.describeStream(
+ DescribeStreamRequest.builder().streamArn(phoenixStreamArn)
+
.exclusiveStartShardId(lastEvaluatedShardId).build()).streamDescription();
+ phoenixShards.addAll(phoenixStreamDesc.shards());
+ lastEvaluatedShardId = phoenixStreamDesc.lastEvaluatedShardId();
+ } while (lastEvaluatedShardId != null);
+
+ Assert.assertEquals(4, phoenixShards.size());
+ for (Shard shard : phoenixShards) {
+ Assert.assertNull(shard.parentShardId());
+ }
+ }
}