This is an automated email from the ASF dual-hosted git repository.

virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git


The following commit(s) were added to refs/heads/main by this push:
     new e0094b7  Move binary streams test to new class
e0094b7 is described below

commit e0094b733332c4c52ae494716ce4ccf49e03278a
Author: Palash Chauhan <[email protected]>
AuthorDate: Fri Feb 6 02:02:05 2026 -0800

    Move binary streams test to new class
---
 .../org/apache/phoenix/ddb/BinaryEndToEndIT.java   |  73 +--------
 .../org/apache/phoenix/ddb/BinaryStreamsIT.java    | 176 +++++++++++++++++++++
 2 files changed, 179 insertions(+), 70 deletions(-)

diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
index d6720bd..4d36535 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryEndToEndIT.java
@@ -103,10 +103,9 @@ public class BinaryEndToEndIT {
     private static String tmpDir;
     private static RESTServer restServer = null;
 
-    // TODO: Add - to names when streams can support it
     private static String TABLE_NAME = "Binary.PK_Test-Table";
     private static String INDEX_NAME = "Binary.PK_Test-Index";
-    Random random = new Random(42);
+    private static Random random = new Random(42);
 
     @BeforeClass
     public static void initialize() throws Exception {
@@ -866,73 +865,7 @@ public class BinaryEndToEndIT {
         }
     }
 
-    @Test
-    public void getStreamRecords() throws InterruptedException {
-        UpdateTableRequest utr =   UpdateTableRequest.builder()
-                .tableName(TABLE_NAME)
-                
.streamSpecification(StreamSpecification.builder().streamEnabled(true).streamViewType("NEW_AND_OLD_IMAGES").build())
-                .build();
-        dynamoDbClient.updateTable(utr);
-        phoenixDBClientV2.updateTable(utr);
-        ListStreamsRequest lsr = 
ListStreamsRequest.builder().tableName(TABLE_NAME).build();
-        ListStreamsResponse phoenixStreams = 
phoenixDBStreamsClientV2.listStreams(lsr);
-        String phoenixStreamArn = phoenixStreams.streams().get(0).streamArn();
-        String dynamoStreamArn = 
dynamoDbStreamsClient.listStreams(lsr).streams().get(0).streamArn();
-        TestUtils.waitForStream(phoenixDBStreamsClientV2, phoenixStreamArn);
-        TestUtils.waitForStream(dynamoDbStreamsClient, dynamoStreamArn);
-
-        List<Map<String, AttributeValue>> keys = new ArrayList<>();
-        for (int i = 0; i < 20; i++) {
-            Map<String, AttributeValue> item = getItem(i);
-            keys.add(getKey(item));
-            PutItemRequest pir = 
PutItemRequest.builder().tableName(TABLE_NAME).item(item).build();
-            dynamoDbClient.putItem(pir);
-            phoenixDBClientV2.putItem(pir);
-        }
-        for (int i = 0; i < 20; i+=2) {
-            byte[] index_hk = new byte[11];
-            random.nextBytes(index_hk);
-            Map<String, AttributeValue> exprAttrVals = new HashMap<>();
-            exprAttrVals.put(":val1", 
AttributeValue.builder().b(SdkBytes.fromByteArray(index_hk)).build());
-            UpdateItemRequest uir = UpdateItemRequest.builder()
-                    .tableName(TABLE_NAME)
-                    .key(keys.get(i))
-                    .updateExpression("SET index_hk = :val1")
-                    .expressionAttributeValues(exprAttrVals)
-                    .build();
-            dynamoDbClient.updateItem(uir);
-            phoenixDBClientV2.updateItem(uir);
-        }
-        for (int i = 1; i < 20; i+=2) {
-            DeleteItemRequest delr = DeleteItemRequest.builder()
-                    .tableName(TABLE_NAME)
-                    .key(keys.get(i))
-                    .build();
-            dynamoDbClient.deleteItem(delr);
-            phoenixDBClientV2.deleteItem(delr);
-        }
-
-        DescribeStreamRequest dsr = 
DescribeStreamRequest.builder().streamArn(phoenixStreamArn).build();
-        StreamDescription phoenixStreamDesc = 
phoenixDBStreamsClientV2.describeStream(dsr).streamDescription();
-        String phoenixShardId = phoenixStreamDesc.shards().get(0).shardId();
-        List<Record> phoenixRecords = 
TestUtils.getRecordsFromShardWithLimit(phoenixDBStreamsClientV2,
-                phoenixStreamArn, phoenixShardId, TRIM_HORIZON, null, 11);
-
-        dsr = 
DescribeStreamRequest.builder().streamArn(dynamoStreamArn).build();
-        String ddbShardId = 
dynamoDbStreamsClient.describeStream(dsr).streamDescription().shards().get(0).shardId();
-        List<Record> ddbRecords = 
TestUtils.getRecordsFromShardWithLimit(dynamoDbStreamsClient,
-                dynamoStreamArn, ddbShardId, TRIM_HORIZON, null, 7);
-
-        Assert.assertEquals(ddbRecords.size(), phoenixRecords.size());
-        for (int i = 0; i < phoenixRecords.size(); i++) {
-            StreamRecord ddbRecord = ddbRecords.get(i).dynamodb();
-            StreamRecord phoenixRecord = phoenixRecords.get(i).dynamodb();
-            Assert.assertEquals(ddbRecord.oldImage(), 
phoenixRecord.oldImage());
-            Assert.assertEquals(ddbRecord.newImage(), 
phoenixRecord.newImage());
-        }
-    }
-
-    private Map<String, AttributeValue> getItem(Integer num) {
+    public static Map<String, AttributeValue> getItem(Integer num) {
         byte[] hk = new byte[15];
         byte[] sk = new byte[23];
         byte[] index_hk = new byte[11];
@@ -950,7 +883,7 @@ public class BinaryEndToEndIT {
         return item;
     }
 
-    private Map<String, AttributeValue> getKey(Map<String, AttributeValue> 
item) {
+    public static Map<String, AttributeValue> getKey(Map<String, 
AttributeValue> item) {
         Map<String, AttributeValue> key = new HashMap<>();
         key.put("hk", item.get("hk"));
         key.put("sk", item.get("sk"));
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryStreamsIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryStreamsIT.java
new file mode 100644
index 0000000..30fe906
--- /dev/null
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BinaryStreamsIT.java
@@ -0,0 +1,176 @@
+package org.apache.phoenix.ddb;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.ddb.rest.RESTServer;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ServerUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsRequest;
+import software.amazon.awssdk.services.dynamodb.model.ListStreamsResponse;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
+import software.amazon.awssdk.services.dynamodb.model.StreamRecord;
+import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateTableRequest;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.TRIM_HORIZON;
+
+public class BinaryStreamsIT {
+
+    private final DynamoDbClient dynamoDbClient =
+            LocalDynamoDbTestBase.localDynamoDb().createV2Client();
+    private final DynamoDbStreamsClient dynamoDbStreamsClient =
+            LocalDynamoDbTestBase.localDynamoDb().createV2StreamsClient();
+    private static DynamoDbClient phoenixDBClientV2;
+    private static DynamoDbStreamsClient phoenixDBStreamsClientV2;
+
+    private static String url;
+    private static HBaseTestingUtility utility = null;
+    private static String tmpDir;
+    private static RESTServer restServer = null;
+
+    private static String TABLE_NAME = "Binary.PK_STREAMs_Test-Table";
+    Random random = new Random(42);
+
+    @BeforeClass
+    public static void initialize() throws Exception {
+        tmpDir = System.getProperty("java.io.tmpdir");
+        LocalDynamoDbTestBase.localDynamoDb().start();
+        Configuration conf = HBaseConfiguration.create();
+        utility = new HBaseTestingUtility(conf);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+                Long.toString(0));
+        props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+                Long.toString(1000));
+        props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+        setUpConfigForMiniCluster(conf, new 
ReadOnlyProps(props.entrySet().iterator()));
+        utility.startMiniCluster();
+        String zkQuorum = "localhost:" + 
utility.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(new PhoenixTestDriver());
+        restServer = new RESTServer(utility.getConfiguration());
+        restServer.run();
+        phoenixDBClientV2 = LocalDynamoDB.createV2Client("http://"; + 
restServer.getServerAddress());
+        phoenixDBStreamsClientV2 = 
LocalDynamoDB.createV2StreamsClient("http://"; + restServer.getServerAddress());
+    }
+
+    @AfterClass
+    public static void stopLocalDynamoDb() throws Exception {
+        LocalDynamoDbTestBase.localDynamoDb().stop();
+        if (restServer != null) {
+            restServer.stop();
+        }
+        ServerUtil.ConnectionFactory.shutdown();
+        try {
+            DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        } finally {
+            if (utility != null) {
+                utility.shutdownMiniCluster();
+            }
+            ServerMetadataCacheTestImpl.resetCache();
+        }
+        System.setProperty("java.io.tmpdir", tmpDir);
+    }
+
+    @Test
+    public void getStreamRecords() throws InterruptedException {
+        CreateTableRequest createTableRequest =
+                DDLTestUtils.getCreateTableRequest(TABLE_NAME, "hk", 
ScalarAttributeType.B,
+                        "sk", ScalarAttributeType.B);
+        phoenixDBClientV2.createTable(createTableRequest);
+        dynamoDbClient.createTable(createTableRequest);
+        UpdateTableRequest utr =   UpdateTableRequest.builder()
+                .tableName(TABLE_NAME)
+                
.streamSpecification(StreamSpecification.builder().streamEnabled(true).streamViewType("NEW_AND_OLD_IMAGES").build())
+                .build();
+        dynamoDbClient.updateTable(utr);
+        phoenixDBClientV2.updateTable(utr);
+        ListStreamsRequest lsr = 
ListStreamsRequest.builder().tableName(TABLE_NAME).build();
+        ListStreamsResponse phoenixStreams = 
phoenixDBStreamsClientV2.listStreams(lsr);
+        String phoenixStreamArn = phoenixStreams.streams().get(0).streamArn();
+        String dynamoStreamArn = 
dynamoDbStreamsClient.listStreams(lsr).streams().get(0).streamArn();
+        TestUtils.waitForStream(phoenixDBStreamsClientV2, phoenixStreamArn);
+        TestUtils.waitForStream(dynamoDbStreamsClient, dynamoStreamArn);
+
+        List<Map<String, AttributeValue>> keys = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            Map<String, AttributeValue> item = BinaryEndToEndIT.getItem(i);
+            keys.add(BinaryEndToEndIT.getKey(item));
+            PutItemRequest pir = 
PutItemRequest.builder().tableName(TABLE_NAME).item(item).build();
+            dynamoDbClient.putItem(pir);
+            phoenixDBClientV2.putItem(pir);
+        }
+        for (int i = 0; i < 20; i+=2) {
+            byte[] index_hk = new byte[11];
+            random.nextBytes(index_hk);
+            Map<String, AttributeValue> exprAttrVals = new HashMap<>();
+            exprAttrVals.put(":val1", 
AttributeValue.builder().b(SdkBytes.fromByteArray(index_hk)).build());
+            UpdateItemRequest uir = UpdateItemRequest.builder()
+                    .tableName(TABLE_NAME)
+                    .key(keys.get(i))
+                    .updateExpression("SET index_hk = :val1")
+                    .expressionAttributeValues(exprAttrVals)
+                    .build();
+            dynamoDbClient.updateItem(uir);
+            phoenixDBClientV2.updateItem(uir);
+        }
+        for (int i = 1; i < 20; i+=2) {
+            DeleteItemRequest delr = DeleteItemRequest.builder()
+                    .tableName(TABLE_NAME)
+                    .key(keys.get(i))
+                    .build();
+            dynamoDbClient.deleteItem(delr);
+            phoenixDBClientV2.deleteItem(delr);
+        }
+
+        DescribeStreamRequest dsr = 
DescribeStreamRequest.builder().streamArn(phoenixStreamArn).build();
+        StreamDescription phoenixStreamDesc = 
phoenixDBStreamsClientV2.describeStream(dsr).streamDescription();
+        String phoenixShardId = phoenixStreamDesc.shards().get(0).shardId();
+        List<Record> phoenixRecords = 
TestUtils.getRecordsFromShardWithLimit(phoenixDBStreamsClientV2,
+                phoenixStreamArn, phoenixShardId, TRIM_HORIZON, null, 11);
+
+        dsr = 
DescribeStreamRequest.builder().streamArn(dynamoStreamArn).build();
+        String ddbShardId = 
dynamoDbStreamsClient.describeStream(dsr).streamDescription().shards().get(0).shardId();
+        List<Record> ddbRecords = 
TestUtils.getRecordsFromShardWithLimit(dynamoDbStreamsClient,
+                dynamoStreamArn, ddbShardId, TRIM_HORIZON, null, 7);
+
+        Assert.assertEquals(ddbRecords.size(), phoenixRecords.size());
+        for (int i = 0; i < phoenixRecords.size(); i++) {
+            StreamRecord ddbRecord = ddbRecords.get(i).dynamodb();
+            StreamRecord phoenixRecord = phoenixRecords.get(i).dynamodb();
+            Assert.assertEquals(ddbRecord.oldImage(), 
phoenixRecord.oldImage());
+            Assert.assertEquals(ddbRecord.newImage(), 
phoenixRecord.newImage());
+        }
+    }
+}

Reply via email to