This is an automated email from the ASF dual-hosted git repository.

virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git


The following commit(s) were added to refs/heads/main by this push:
     new 55473b3  Align DynamoDB Streams identifiers with AWS
55473b3 is described below

commit 55473b30773c251e3c319d75db500cf28ffef2b2
Author: Viraj Jasani <[email protected]>
AuthorDate: Mon May 18 17:09:03 2026 -0700

    Align DynamoDB Streams identifiers with AWS
---
 DDB_API_REFERENCE.md                               |  51 ++-
 .../phoenix/ddb/service/DescribeStreamService.java | 111 +++--
 .../phoenix/ddb/service/GetRecordsService.java     |  15 +-
 .../ddb/service/GetShardIteratorService.java       |  44 +-
 .../phoenix/ddb/service/ListStreamsService.java    |  11 +-
 .../ddb/service/utils/TableDescriptorUtils.java    |   3 +-
 .../java/org/apache/phoenix/ddb/DDLTestUtils.java  |   6 +-
 .../apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java | 462 +++++++++++++++++++++
 .../org/apache/phoenix/ddb/GetShardIteratorIT.java |  38 +-
 .../java/org/apache/phoenix/ddb/ListStreamsIT.java |   5 +-
 .../phoenix/ddb/utils/DdbAdapterCdcUtils.java      | 162 +++++++-
 .../phoenix/ddb/utils/PhoenixShardIterator.java    | 124 +++++-
 12 files changed, 925 insertions(+), 107 deletions(-)

diff --git a/DDB_API_REFERENCE.md b/DDB_API_REFERENCE.md
index a77158d..fba007e 100644
--- a/DDB_API_REFERENCE.md
+++ b/DDB_API_REFERENCE.md
@@ -583,8 +583,8 @@ Returns the full description of a table including its 
schema, indexes, stream co
       "StreamEnabled": true,
       "StreamViewType": "NEW_AND_OLD_IMAGES"
     },
-    "LatestStreamArn": "phoenix/cdc/stream/...",
-    "LatestStreamLabel": "2024-01-15T10:30:00Z"
+    "LatestStreamArn": 
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000",
+    "LatestStreamLabel": "2024-01-15T10:30:00.000"
   }
 }
 ```
@@ -1406,16 +1406,22 @@ Returns a list of all streams, optionally filtered by 
table name.
   "Streams": [
     {
       "TableName": "MyTable",
-      "StreamArn": "phoenix/cdc/stream/MyTable/...",
-      "StreamLabel": "2024-01-15T10:30:00Z"
+      "StreamArn": 
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000",
+      "StreamLabel": "2024-01-15T10:30:00.000"
     }
   ],
-  "LastEvaluatedStreamArn": "phoenix/cdc/stream/MyTable/..."
+  "LastEvaluatedStreamArn": 
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000"
 }
 ```
 
 `LastEvaluatedStreamArn` is present only when the result count equals the 
limit.
 
+The emitted `StreamArn` is an AWS-shaped synthetic ARN. Region (`us-west-2`) 
and account
+(`000000000000`) are fixed sentinels because phoenix-adapters can run 
anywhere; the label
+segment is the UTC ISO timestamp matching AWS DynamoDB Streams' StreamLabel 
format.
+For backward compatibility, every endpoint that accepts a stream identifier 
also accepts
+the legacy bare internal name 
`phoenix/cdc/stream/{table}/CDC_{table}/{ts}/{creationDt}`.
+
 ---
 
 ### 9.2 DescribeStream
@@ -1437,9 +1443,9 @@ Returns detailed information about a stream including its 
shards.
 ```json
 {
   "StreamDescription": {
-    "StreamArn": "phoenix/cdc/stream/MyTable/...",
+    "StreamArn": 
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000",
     "TableName": "MyTable",
-    "StreamLabel": "2024-01-15T10:30:00Z",
+    "StreamLabel": "2024-01-15T10:30:00.000",
     "StreamViewType": "NEW_AND_OLD_IMAGES",
     "CreationRequestDateTime": 1700000000.000,
     "KeySchema": [
@@ -1448,15 +1454,15 @@ Returns detailed information about a stream including 
its shards.
     "StreamStatus": "ENABLED",
     "Shards": [
       {
-        "ShardId": "partition-1",
-        "ParentShardId": "parent-partition-0",
+        "ShardId": "shardId-1700000099999-1a2b3c4d5e6f7890abcdef0123456789",
+        "ParentShardId": 
"shardId-1700000000000-a1b2c3d4e5f60718293a4b5c6d7e8f90",
         "SequenceNumberRange": {
-          "StartingSequenceNumber": "170000000000000",
-          "EndingSequenceNumber": "170100000099999"
+          "StartingSequenceNumber": "000170000009999900000",
+          "EndingSequenceNumber": "000170000010000099999"
         }
       }
     ],
-    "LastEvaluatedShardId": "partition-1"
+    "LastEvaluatedShardId": 
"shardId-1700000099999-1a2b3c4d5e6f7890abcdef0123456789"
   }
 }
 ```
@@ -1464,6 +1470,8 @@ Returns detailed information about a stream including its 
shards.
 - Shards are only listed when `StreamStatus` is `ENABLED`
 - `EndingSequenceNumber` is only present for closed shards (after a split)
 - `LastEvaluatedShardId` is present only when shard count equals the limit
+- `ShardId` and `ParentShardId` follow the AWS-shape 
`shardId-<partitionStartMs>-<32-char-hex>` format (length 49-54, within the AWS 
spec [28, 65]). `ParentShardId` is omitted when the parent partition has been 
TTL-pruned from `SYSTEM.CDC_STREAM`.
+- `StartingSequenceNumber` and `EndingSequenceNumber` are 21-digit zero-padded 
numeric strings (matching the AWS spec minimum length of 21). 
`BigInteger`/`Long.parseLong` both ignore leading zeros so numeric ordering vs. 
unpadded values is preserved.
 
 ---
 
@@ -1495,11 +1503,18 @@ Gets a shard iterator for reading records from a 
specific position in a shard.
 
 ```json
 {
-  "ShardIterator": "shardIterator/tableName/cdcObject/streamType/shardId/12345"
+  "ShardIterator": 
"arn:aws:dynamodb:us-west-2:000000000000:table/myTable/stream/2024-01-15T10:30:00.000|1|eyJzdHJlYW1UeXBlIjoiTkVXX0lNQUdFIiwicGFydGl0aW9uSWQiOiIxYTJiM2M0ZDVlNmY3ODkwYWJjZGVmMDEyMzQ1Njc4OSIsInNlcU51bSI6IjAwMDE3MDAwMDAwMDAwMDAwMDAwMDAwIn0"
 }
 ```
 
-The shard iterator is an encoded string containing: table name, CDC object 
name, stream type, shard ID, and starting sequence number.
+The shard iterator follows the DynamoDB Streams wire format
+`<streamArn>|<version>|<base64(JSON state)>`:
+- `<streamArn>` carries the table identity (table name + creation-time stream 
label).
+- `<version>` is the literal `"1"`; reserved for future inner-format evolution.
+- `<base64(JSON state)>` is a base64-encoded JSON object carrying the 
per-iterator
+  resume state: 
`{"streamType":"...","partitionId":"<32-hex>","seqNum":"<21-digit>"}`.
+
+Treat the value as opaque; clients pass it unchanged to subsequent 
`GetRecords` calls.
 
 ---
 
@@ -1525,7 +1540,7 @@ Reads change records from a shard using a shard iterator.
       "eventName": "INSERT",
       "dynamodb": {
         "StreamViewType": "NEW_AND_OLD_IMAGES",
-        "SequenceNumber": "170000000000001",
+        "SequenceNumber": "000170000000012300000",
         "ApproximateCreationDateTime": 1700000000.123,
         "Keys": {
           "id": {"S": "user-123"}
@@ -1542,7 +1557,7 @@ Reads change records from a shard using a shard iterator.
       "eventName": "MODIFY",
       "dynamodb": {
         "StreamViewType": "NEW_AND_OLD_IMAGES",
-        "SequenceNumber": "170000000000002",
+        "SequenceNumber": "000170000000145600000",
         "ApproximateCreationDateTime": 1700000001.456,
         "Keys": {"id": {"S": "user-123"}},
         "OldImage": {"id": {"S": "user-123"}, "name": {"S": "John Doe"}, 
"age": {"N": "30"}},
@@ -1554,7 +1569,7 @@ Reads change records from a shard using a shard iterator.
       "eventName": "REMOVE",
       "dynamodb": {
         "StreamViewType": "NEW_AND_OLD_IMAGES",
-        "SequenceNumber": "170000000000003",
+        "SequenceNumber": "000170000000278900000",
         "ApproximateCreationDateTime": 1700000002.789,
         "Keys": {"id": {"S": "user-456"}},
         "OldImage": {"id": {"S": "user-456"}, "name": {"S": "Jane"}},
@@ -1566,7 +1581,7 @@ Reads change records from a shard using a shard iterator.
       }
     }
   ],
-  "NextShardIterator": 
"shardIterator/tableName/cdcObject/streamType/shardId/170000000000004"
+  "NextShardIterator": 
"arn:aws:dynamodb:us-west-2:000000000000:table/myTable/stream/2024-01-15T10:30:00.000|1|eyJzdHJlYW1UeXBlIjoiTkVXX0lNQUdFIiwicGFydGl0aW9uSWQiOiIxYTJiM2M0ZDVlNmY3ODkwYWJjZGVmMDEyMzQ1Njc4OSIsInNlcU51bSI6IjAwMDE3MDAwMDAwMDAwMDAwMDA0In0"
 }
 ```
 
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
index 9d5c139..f55acc3 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
@@ -18,8 +18,11 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.MAX_NUM_CHANGES_AT_TIMESTAMP;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
@@ -33,14 +36,23 @@ public class DescribeStreamService {
             = "SELECT PARTITION_ID, PARENT_PARTITION_ID, PARTITION_START_TIME, 
PARTITION_END_TIME FROM "
             + SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = '%s' AND 
STREAM_NAME = '%s' ";
 
-    public static Map<String, Object> describeStream(Map<String, Object> 
request, String connectionUrl) {
-        String streamName = (String) request.get(ApiMetadata.STREAM_ARN);
+    private static final String PARENT_PARTITION_START_TIMES_QUERY
+            = "SELECT PARTITION_ID, PARTITION_START_TIME FROM "
+            + SYSTEM_CDC_STREAM_NAME
+            + " WHERE TABLE_NAME = '%s' AND STREAM_NAME = '%s' AND 
PARTITION_ID IN (%s)";
+
+    public static Map<String, Object> describeStream(Map<String, Object> 
request,
+        String connectionUrl) {
+        String streamArnInput = (String) request.get(ApiMetadata.STREAM_ARN);
+        String streamName = 
DdbAdapterCdcUtils.normalizeStreamName(streamArnInput);
+        String streamArn = DdbAdapterCdcUtils.isStreamArn(streamArnInput) ?
+            streamArnInput : DdbAdapterCdcUtils.toStreamArn(streamName);
         String exclusiveStartShardId = (String) 
request.get(ApiMetadata.EXCLUSIVE_START_SHARD_ID);
         Integer limit = (Integer) request.getOrDefault(ApiMetadata.LIMIT, 
MAX_LIMIT);
         String tableName = 
DdbAdapterCdcUtils.getTableNameFromStreamName(streamName);
         Map<String, Object> streamDesc;
         try (Connection conn = ConnectionUtil.getConnection(connectionUrl)) {
-            streamDesc = getStreamDescriptionObject(conn, tableName, 
streamName);
+            streamDesc = getStreamDescriptionObject(conn, tableName, 
streamName, streamArn);
             String streamStatus = DdbAdapterCdcUtils.getStreamStatus(conn, 
tableName, streamName);
             streamDesc.put(ApiMetadata.STREAM_STATUS, streamStatus);
             // query partitions only if stream is ENABLED
@@ -48,19 +60,55 @@ public class DescribeStreamService {
                 StringBuilder sb = new 
StringBuilder(String.format(DESCRIBE_STREAM_QUERY, tableName, streamName));
                 if (!StringUtils.isEmpty(exclusiveStartShardId)) {
                     sb.append(" AND PARTITION_ID > '");
-                    sb.append(exclusiveStartShardId);
+                    
sb.append(DdbAdapterCdcUtils.partitionIdFromShardId(exclusiveStartShardId));
                     sb.append("'");
                 }
                 sb.append(" LIMIT ");
                 sb.append(limit);
                 LOGGER.debug("Describe Stream Query: {}", sb);
-                List<Map<String, Object>> shards = new ArrayList<>();
-                String lastEvaluatedShardId = null;
+
+                List<Map<String, Object>> rawShards = new ArrayList<>();
+                Map<String, Long> partitionStartTimes = new HashMap<>();
+                Set<String> parentIdsNeeded = new HashSet<>();
                 ResultSet rs = 
conn.createStatement().executeQuery(sb.toString());
                 int count = 0;
                 while (rs.next()) {
                     count++;
-                    Map<String, Object> shard = getShardMetadata(rs);
+                    Map<String, Object> raw = new HashMap<>();
+                    String partitionId = rs.getString(1);
+                    String parentPartitionId = rs.getString(2);
+                    long partitionStartTime = rs.getLong(3);
+                    long partitionEndTime = rs.getLong(4);
+                    raw.put("partitionId", partitionId);
+                    raw.put("parentPartitionId", parentPartitionId);
+                    raw.put("partitionStartTime", partitionStartTime);
+                    raw.put("partitionEndTime", partitionEndTime);
+                    rawShards.add(raw);
+                    partitionStartTimes.put(partitionId, partitionStartTime);
+                    if (parentPartitionId != null) {
+                        parentIdsNeeded.add(parentPartitionId);
+                    }
+                }
+                // Parents already in the current page don't need a second 
query.
+                parentIdsNeeded.removeAll(partitionStartTimes.keySet());
+
+                if (!parentIdsNeeded.isEmpty()) {
+                    String inClause = parentIdsNeeded.stream()
+                            .map(id -> "'" + id + "'")
+                            .collect(Collectors.joining(","));
+                    String parentQuery = 
String.format(PARENT_PARTITION_START_TIMES_QUERY,
+                            tableName, streamName, inClause);
+                    LOGGER.debug("Parent Partition Start Times Query: {}", 
parentQuery);
+                    ResultSet prs = 
conn.createStatement().executeQuery(parentQuery);
+                    while (prs.next()) {
+                        partitionStartTimes.put(prs.getString(1), 
prs.getLong(2));
+                    }
+                }
+
+                List<Map<String, Object>> shards = new ArrayList<>();
+                String lastEvaluatedShardId = null;
+                for (Map<String, Object> raw : rawShards) {
+                    Map<String, Object> shard = buildShardMetadata(raw, 
partitionStartTimes);
                     shards.add(shard);
                     lastEvaluatedShardId = (String) 
shard.get(ApiMetadata.SHARD_ID);
                 }
@@ -81,14 +129,12 @@ public class DescribeStreamService {
      * Return a StreamDescription object for the given tableName and 
streamName.
      * Populate all attributes except the list of the shards.
      */
-    private static Map<String, Object> getStreamDescriptionObject(Connection 
conn,
-                                                                        String 
tableName,
-                                                                        String 
streamName)
-            throws SQLException {
+    private static Map<String, Object> getStreamDescriptionObject(Connection 
conn, String tableName,
+        String streamName, String streamArn) throws SQLException {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         PTable table = pconn.getTable(tableName);
         Map<String, Object> streamDesc = new HashMap<>();
-        streamDesc.put(ApiMetadata.STREAM_ARN, streamName);
+        streamDesc.put(ApiMetadata.STREAM_ARN, streamArn);
         streamDesc.put(ApiMetadata.TABLE_NAME, 
PhoenixUtils.getTableNameFromFullName(tableName, false));
         long creationTS = 
DdbAdapterCdcUtils.getCDCIndexTimestampFromStreamName(streamName);
         streamDesc.put(ApiMetadata.STREAM_LABEL, 
DdbAdapterCdcUtils.getStreamLabel(streamName));
@@ -100,23 +146,38 @@ public class DescribeStreamService {
     }
 
     /**
-     * Build a Shard object using a ResultSet cursor from a query on 
SYSTEM.CDC_STREAM.
+     * Build a Shard response object from a buffered partition row and a 
precomputed map
+     * of {@code partitionId -> partitionStartTime} (including any parents 
pulled from the
+     * batch parent lookup).
      */
-    private static Map<String, Object> getShardMetadata(ResultSet rs) throws 
SQLException {
-        // rs --> id, parentId, startTime, endTime
+    private static Map<String, Object> buildShardMetadata(Map<String, Object> 
raw,
+        Map<String, Long> partitionStartTimes) {
+        String partitionId = (String) raw.get("partitionId");
+        String parentPartitionId = (String) raw.get("parentPartitionId");
+        long partitionStartTime = (Long) raw.get("partitionStartTime");
+        long partitionEndTime = (Long) raw.get("partitionEndTime");
+
         Map<String, Object> shard = new HashMap<>();
-        // shard id
-        shard.put(ApiMetadata.SHARD_ID, rs.getString(1));
-        // parent shard id
-        if (rs.getString(2) != null) {
-            shard.put(ApiMetadata.PARENT_SHARD_ID, rs.getString(2));
+        shard.put(ApiMetadata.SHARD_ID,
+            DdbAdapterCdcUtils.toShardId(partitionStartTime, partitionId));
+        if (parentPartitionId != null) {
+            Long parentStartTime = partitionStartTimes.get(parentPartitionId);
+            if (parentStartTime != null) {
+                shard.put(ApiMetadata.PARENT_SHARD_ID,
+                    DdbAdapterCdcUtils.toShardId(parentStartTime, 
parentPartitionId));
+            } else {
+                LOGGER.info("Parent partition {} for partition {} is no longer 
present in "
+                    + "SYSTEM.CDC_STREAM (likely TTLed); omitting 
ParentShardId "
+                    + "from the response.", parentPartitionId, partitionId);
+            }
         }
-        // start sequence number
         Map<String, Object> seqNumRange = new HashMap<>();
-        seqNumRange.put(ApiMetadata.STARTING_SEQUENCE_NUMBER, 
String.valueOf(rs.getLong(3) * MAX_NUM_CHANGES_AT_TIMESTAMP));
-        // end sequence number
-        if (rs.getLong(4) > 0) {
-            seqNumRange.put(ApiMetadata.ENDING_SEQUENCE_NUMBER, 
String.valueOf(((rs.getLong(4)+1) * MAX_NUM_CHANGES_AT_TIMESTAMP) - 1));
+        seqNumRange.put(ApiMetadata.STARTING_SEQUENCE_NUMBER,
+            DdbAdapterCdcUtils.getSequenceNumber(partitionStartTime, 0));
+        if (partitionEndTime > 0) {
+            seqNumRange.put(ApiMetadata.ENDING_SEQUENCE_NUMBER,
+                DdbAdapterCdcUtils.getSequenceNumber(partitionEndTime,
+                    MAX_NUM_CHANGES_AT_TIMESTAMP - 1));
         }
         shard.put(ApiMetadata.SEQUENCE_NUMBER_RANGE, seqNumRange);
         return shard;
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
index f41ebb1..919cbe9 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 
 import org.apache.phoenix.ddb.ConnectionUtil;
 import org.apache.phoenix.ddb.service.exceptions.PhoenixServiceException;
+import org.apache.phoenix.ddb.service.exceptions.ValidationException;
 import org.apache.phoenix.ddb.utils.ApiMetadata;
 import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
 import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
@@ -57,8 +58,7 @@ public class GetRecordsService {
      * return null for nextShardIterator if there are more records to return.
      */
     public static Map<String, Object> getRecords(Map<String, Object> request, 
String connectionUrl) {
-        PhoenixShardIterator pIter
-                = new PhoenixShardIterator((String) 
request.get(ApiMetadata.SHARD_ITERATOR));
+        PhoenixShardIterator pIter = parseShardIterator(request);
         Integer requestLimit = (Integer) request.get(ApiMetadata.LIMIT);
         List<Map<String, Object>> records = new ArrayList<>();
         long lastTs = pIter.getTimestamp();
@@ -218,4 +218,15 @@ public class GetRecordsService {
         record.put(ApiMetadata.AWS_REGION, ApiMetadata.AWS_REGION_VALUE);
         return record;
     }
+
+    private static PhoenixShardIterator parseShardIterator(Map<String, Object> 
request) {
+        String shardIterator = (String) 
request.get(ApiMetadata.SHARD_ITERATOR);
+        try {
+            return new PhoenixShardIterator(shardIterator);
+        } catch (RuntimeException e) {
+            throw new ValidationException(
+                "Invalid ShardIterator: " + shardIterator + ", error message: 
" + (
+                    e.getMessage() == null ? "" : e.getMessage()));
+        }
+    }
 }
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
index 423a0cf..19992e1 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
@@ -5,6 +5,7 @@ import 
org.apache.phoenix.ddb.service.exceptions.PhoenixServiceException;
 import org.apache.phoenix.ddb.service.exceptions.ValidationException;
 import org.apache.phoenix.ddb.utils.ApiMetadata;
 import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
+import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 
 import java.sql.Connection;
@@ -12,26 +13,28 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.MAX_NUM_CHANGES_AT_TIMESTAMP;
-import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_FORMAT;
-
 public class GetShardIteratorService {
 
     public static Map<String, Object> getShardIterator(Map<String, Object> 
request,
                                                        String connectionUrl) {
         Map<String, Object> result = new HashMap<>();
         try (Connection conn = ConnectionUtil.getConnection(connectionUrl)) {
-            String streamArn = (String) request.get(ApiMetadata.STREAM_ARN);
-            String shardId = (String) request.get(ApiMetadata.SHARD_ID);
+            String streamArnInput = (String) 
request.get(ApiMetadata.STREAM_ARN);
+            String streamName = 
DdbAdapterCdcUtils.normalizeStreamName(streamArnInput);
+            String partitionId = DdbAdapterCdcUtils.partitionIdFromShardId(
+                (String) request.get(ApiMetadata.SHARD_ID));
             String seqNum = (String) request.get(ApiMetadata.SEQUENCE_NUMBER);
             String shardIterType = (String) 
request.get(ApiMetadata.SHARD_ITERATOR_TYPE);
-            String tableName = 
DdbAdapterCdcUtils.getTableNameFromStreamName(streamArn);
-            String cdcObj = 
DdbAdapterCdcUtils.getCDCObjectNameFromStreamName(streamArn);
-            String startSeqNum = getStartingSequenceNumber(conn, tableName, 
streamArn, shardId,
-                    seqNum, shardIterType);
+            String tableName = 
DdbAdapterCdcUtils.getTableNameFromStreamName(streamName);
+            String startSeqNum =
+                getStartingSequenceNumber(conn, tableName, streamName, 
partitionId, seqNum,
+                    shardIterType);
             String streamType = DdbAdapterCdcUtils.getStreamType(conn, 
tableName);
-            result.put(ApiMetadata.SHARD_ITERATOR, 
String.format(SHARD_ITERATOR_FORMAT, tableName,
-                    cdcObj, streamType, shardId, startSeqNum));
+            String streamArn = DdbAdapterCdcUtils.isStreamArn(streamArnInput)
+                ? streamArnInput : DdbAdapterCdcUtils.toStreamArn(streamName);
+            PhoenixShardIterator pIter = new PhoenixShardIterator(streamArn, 
streamName,
+                streamType, partitionId, startSeqNum);
+            result.put(ApiMetadata.SHARD_ITERATOR, pIter.toString());
         } catch (SQLException e) {
             throw new PhoenixServiceException(e);
         }
@@ -39,27 +42,30 @@ public class GetShardIteratorService {
     }
 
     private static String getStartingSequenceNumber(Connection conn, String 
tableName,
-                                                    String streamName, String 
shardId,
+                                                    String streamName, String 
partitionId,
                                                     String seqNum, String type)
             throws SQLException {
-        String startSeqNum = null;
+        String startSeqNum;
         switch (type) {
             case "AT_SEQUENCE_NUMBER" :
-                startSeqNum = seqNum;
+                startSeqNum =
+                    String.format("%021d", 
DdbAdapterCdcUtils.parseSequenceNumber(seqNum));
                 break;
             case "AFTER_SEQUENCE_NUMBER":
-                startSeqNum = String.valueOf(Long.parseLong(seqNum) + 1);
+                startSeqNum =
+                    String.format("%021d", 
DdbAdapterCdcUtils.parseSequenceNumber(seqNum) + 1);
                 break;
             case "LATEST":
                 // new records only i.e. use current time.
-                startSeqNum = 
String.valueOf(EnvironmentEdgeManager.currentTimeMillis()
-                        * MAX_NUM_CHANGES_AT_TIMESTAMP);
+                startSeqNum =
+                    
DdbAdapterCdcUtils.getSequenceNumber(EnvironmentEdgeManager.currentTimeMillis(),
+                        0);
                 break;
             case "TRIM_HORIZON":
                 // Oldest available sequence number in the shard, we will use 
shard's start sequence number
                 long partitionStartTime = 
DdbAdapterCdcUtils.getPartitionStartTime(
-                        conn, tableName, streamName, shardId);
-                startSeqNum = String.valueOf(partitionStartTime * 
MAX_NUM_CHANGES_AT_TIMESTAMP);
+                        conn, tableName, streamName, partitionId);
+                startSeqNum = 
DdbAdapterCdcUtils.getSequenceNumber(partitionStartTime, 0);
                 break;
         default:
                 throw new ValidationException("Invalid shard iterator type: " 
+ type);
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
index cc81946..5ae0ac2 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
@@ -44,25 +44,26 @@ public class ListStreamsService {
             }
             if (!StringUtils.isEmpty(exclusiveStartStreamArn)) {
                 query.append(" AND STREAM_NAME > '")
-                     .append(exclusiveStartStreamArn)
+                     
.append(DdbAdapterCdcUtils.normalizeStreamName(exclusiveStartStreamArn))
                      .append("'");
             }
             int limit = (int) request.getOrDefault(ApiMetadata.LIMIT, 
MAX_LIMIT);
             query.append(" LIMIT ").append(limit);
             ResultSet rs = 
connection.createStatement().executeQuery(query.toString());
-            String lastStreamArn = null;
+            String lastStreamName = null;
             while (rs.next()) {
                 String tableName = rs.getString(1);
                 String streamName = rs.getString(2);
                 Map<String, Object> stream = new HashMap<>();
                 stream.put(ApiMetadata.TABLE_NAME, 
PhoenixUtils.getTableNameFromFullName(tableName, false));
-                stream.put(ApiMetadata.STREAM_ARN, streamName);
+                stream.put(ApiMetadata.STREAM_ARN, 
DdbAdapterCdcUtils.toStreamArn(streamName));
                 stream.put(ApiMetadata.STREAM_LABEL, 
DdbAdapterCdcUtils.getStreamLabel(streamName));
                 streams.add(stream);
-                lastStreamArn = streamName;
+                lastStreamName = streamName;
             }
             result.put(ApiMetadata.STREAMS, streams);
-            result.put(ApiMetadata.LAST_EVALUATED_STREAM_ARN, lastStreamArn);
+            result.put(ApiMetadata.LAST_EVALUATED_STREAM_ARN,
+                    lastStreamName == null ? null : 
DdbAdapterCdcUtils.toStreamArn(lastStreamName));
         } catch (SQLException e) {
             throw new PhoenixServiceException(e);
         }
diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
index 0bd363d..d02b05c 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
@@ -217,7 +217,8 @@ public class TableDescriptorUtils {
         String streamName = DdbAdapterCdcUtils.getEnabledStreamName(pconn,
                 table.getName().getString());
         if (streamName != null && table.getSchemaVersion() != null) {
-            tableDescription.put(ApiMetadata.LATEST_STREAM_ARN, streamName);
+            tableDescription.put(ApiMetadata.LATEST_STREAM_ARN,
+                    DdbAdapterCdcUtils.toStreamArn(streamName));
             tableDescription.put(ApiMetadata.LATEST_STREAM_LABEL,
                     DdbAdapterCdcUtils.getStreamLabel(streamName));
 
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java
index 6824a1f..1c68237 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java
@@ -267,7 +267,9 @@ public class DDLTestUtils {
                                          String streamType)
             throws SQLException, ParseException {
         String tableName = td.tableName();
-        
Assert.assertTrue(td.latestStreamArn().startsWith("phoenix/cdc/stream/"));
+        Assert.assertTrue(td.latestStreamArn().startsWith(
+                "arn:aws:dynamodb:us-west-2:000000000000:table/"));
+        Assert.assertTrue(td.latestStreamArn().contains("/stream/"));
         Assert.assertTrue(td.latestStreamArn().contains(tableName));
 
         PTable dataTable = 
pconn.getTable(PhoenixUtils.getFullTableName(tableName, false));
@@ -285,7 +287,7 @@ public class DDLTestUtils {
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
         Date date = df.parse(td.latestStreamLabel());
         Assert.assertEquals(String.valueOf(cdcIndex.getTimeStamp()), 
String.valueOf(date.getTime()));
-        
Assert.assertTrue(td.latestStreamArn().contains(String.valueOf(cdcIndex.getTimeStamp())));
+        Assert.assertTrue(td.latestStreamArn().endsWith("/stream/" + 
td.latestStreamLabel()));
     }
 
     private static CreateTableRequest.Builder 
addGlobalIndexToRequest(CreateTableRequest.Builder request, String indexName,
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java
 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java
new file mode 100644
index 0000000..3a31da6
--- /dev/null
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Base64;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
+import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DdbAdapterCdcUtilsTest {
+
+    private static final long CDC_INDEX_TS = 1700000000000L;
+    private static final String TABLE_NAME = "MY_TABLE";
+    private static final String INTERNAL_TABLE = "DDB." + TABLE_NAME;
+
+    private static String formatUtc(long ts) {
+        SimpleDateFormat df = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+        df.setTimeZone(TimeZone.getTimeZone("Etc/UTC"));
+        return df.format(new Date(ts));
+    }
+
+    private static String internalStreamName(long ts) {
+        return "phoenix/cdc/stream/" + INTERNAL_TABLE + "/CDC_" + TABLE_NAME + 
"/" + ts + "/"
+            + formatUtc(ts);
+    }
+
+    @Test
+    public void testToStreamArn_emitsAwsShape() {
+        String arn = 
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+        Assert.assertEquals(
+            "arn:aws:dynamodb:us-west-2:000000000000:table/" + TABLE_NAME + 
"/stream/" + formatUtc(
+                CDC_INDEX_TS), arn);
+    }
+
+    @Test
+    public void testToStreamArn_stripsInternalDdbPrefix() {
+        String arn = 
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+        Assert.assertFalse(arn.contains("DDB." + TABLE_NAME));
+        Assert.assertTrue(arn.contains(":table/" + TABLE_NAME + "/stream/"));
+    }
+
+    @Test
+    public void testFromStreamArn_addsInternalDdbPrefix() {
+        String arn =
+            "arn:aws:dynamodb:us-west-2:000000000000:table/" + TABLE_NAME + 
"/stream/" + formatUtc(
+                CDC_INDEX_TS);
+        Assert.assertEquals(internalStreamName(CDC_INDEX_TS),
+            DdbAdapterCdcUtils.fromStreamArn(arn));
+    }
+
+    @Test
+    public void testFromStreamArn_idempotentOnLegacyArnWithDdbPrefix() {
+        String legacyArn =
+            "arn:aws:dynamodb:us-west-2:000000000000:table/" + INTERNAL_TABLE 
+ "/stream/"
+                + formatUtc(CDC_INDEX_TS);
+        Assert.assertEquals(internalStreamName(CDC_INDEX_TS),
+            DdbAdapterCdcUtils.fromStreamArn(legacyArn));
+    }
+
+    @Test
+    public void testNormalizeThenToStreamArn_canonicalizesDdbPrefixedArn() {
+        String prefixedArn =
+            "arn:aws:dynamodb:us-west-2:000000000000:table/" + INTERNAL_TABLE 
+ "/stream/"
+                + formatUtc(CDC_INDEX_TS);
+        String canonicalArn =
+            "arn:aws:dynamodb:us-west-2:000000000000:table/" + TABLE_NAME + 
"/stream/" + formatUtc(
+                CDC_INDEX_TS);
+        String normalized = 
DdbAdapterCdcUtils.normalizeStreamName(prefixedArn);
+        Assert.assertEquals(canonicalArn, 
DdbAdapterCdcUtils.toStreamArn(normalized));
+    }
+
+    @Test
+    public void testFromStreamArn_reconstructsInternalName() {
+        String internal = internalStreamName(CDC_INDEX_TS);
+        String arn = DdbAdapterCdcUtils.toStreamArn(internal);
+        Assert.assertEquals(internal, DdbAdapterCdcUtils.fromStreamArn(arn));
+    }
+
+    @Test
+    public void testRoundTrip_byteIdentical() {
+        for (long ts : new long[] {0L, 1L, 1000L, 1700000000000L, 
1777440690689L, 4102444800000L}) {
+            String internal = internalStreamName(ts);
+            Assert.assertEquals("ts=" + ts, internal,
+                
DdbAdapterCdcUtils.fromStreamArn(DdbAdapterCdcUtils.toStreamArn(internal)));
+        }
+    }
+
+    @Test
+    public void testIsStreamArn() {
+        Assert.assertTrue(DdbAdapterCdcUtils.isStreamArn(
+            
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2024-01-15T10:30:00.000"));
+        
Assert.assertFalse(DdbAdapterCdcUtils.isStreamArn(internalStreamName(CDC_INDEX_TS)));
+        Assert.assertFalse(DdbAdapterCdcUtils.isStreamArn(null));
+        Assert.assertFalse(DdbAdapterCdcUtils.isStreamArn(""));
+    }
+
+    @Test
+    public void testNormalizeStreamName_acceptsBothFormats() {
+        String internal = internalStreamName(CDC_INDEX_TS);
+        String arn = DdbAdapterCdcUtils.toStreamArn(internal);
+        Assert.assertEquals(internal, 
DdbAdapterCdcUtils.normalizeStreamName(internal));
+        Assert.assertEquals(internal, 
DdbAdapterCdcUtils.normalizeStreamName(arn));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromStreamArn_rejectsNonArn() {
+        DdbAdapterCdcUtils.fromStreamArn(internalStreamName(CDC_INDEX_TS));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromStreamArn_rejectsMissingStreamSegment() {
+        DdbAdapterCdcUtils.fromStreamArn(
+            
"arn:aws:dynamodb:us-west-2:000000000000:table/T-without-stream-segment");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromStreamArn_rejectsMalformedLabel() {
+        DdbAdapterCdcUtils.fromStreamArn(
+            
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/not-an-iso-timestamp");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromStreamArn_rejectsImpossibleDayInMonth() {
+        DdbAdapterCdcUtils.fromStreamArn(
+            
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2024-02-30T00:00:00.000");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromStreamArn_rejectsFeb29InNonLeapYear() {
+        DdbAdapterCdcUtils.fromStreamArn(
+            
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2023-02-29T00:00:00.000");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromStreamArn_rejectsImpossibleHour() {
+        DdbAdapterCdcUtils.fromStreamArn(
+            
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2024-01-15T25:00:00.000");
+    }
+
+    @Test
+    public void testToStreamArn_preservesTableName() {
+        String arn = 
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+        Assert.assertTrue(arn.contains(":table/" + TABLE_NAME + "/stream/"));
+    }
+
+    @Test
+    public void testGetStreamLabel_unchangedAfterRoundTrip() {
+        String internal = internalStreamName(CDC_INDEX_TS);
+        String arn = DdbAdapterCdcUtils.toStreamArn(internal);
+        Assert.assertEquals(DdbAdapterCdcUtils.getStreamLabel(internal),
+            
DdbAdapterCdcUtils.getStreamLabel(DdbAdapterCdcUtils.fromStreamArn(arn)));
+    }
+
+    private static final String PARTITION_HEX = 
"1a2b3c4d5e6f7890abcdef0123456789";
+    private static final long PARTITION_START_MS = 1700000000000L;
+
+    @Test
+    public void testToShardId_emitsAwsShape() {
+        Assert.assertEquals("shardId-" + PARTITION_START_MS + "-" + 
PARTITION_HEX,
+            DdbAdapterCdcUtils.toShardId(PARTITION_START_MS, PARTITION_HEX));
+    }
+
+    @Test
+    public void testToShardId_lengthWithinAwsSpec() {
+        String shardId = DdbAdapterCdcUtils.toShardId(PARTITION_START_MS, 
PARTITION_HEX);
+        Assert.assertTrue("len=" + shardId.length(),
+            shardId.length() >= 28 && shardId.length() <= 65);
+    }
+
+    @Test
+    public void testIsShardId_detectsBothFormats() {
+        Assert.assertTrue(
+            DdbAdapterCdcUtils.isShardId("shardId-" + PARTITION_START_MS + "-" 
+ PARTITION_HEX));
+        Assert.assertFalse(DdbAdapterCdcUtils.isShardId(PARTITION_HEX));
+        Assert.assertFalse(DdbAdapterCdcUtils.isShardId(null));
+        Assert.assertFalse(DdbAdapterCdcUtils.isShardId(""));
+    }
+
+    @Test
+    public void testPartitionIdFromShardId_extractsBareHexFromNewShape() {
+        String shardId = DdbAdapterCdcUtils.toShardId(PARTITION_START_MS, 
PARTITION_HEX);
+        Assert.assertEquals(PARTITION_HEX, 
DdbAdapterCdcUtils.partitionIdFromShardId(shardId));
+    }
+
+    @Test
+    public void testPartitionIdFromShardId_passesThroughLegacyHex() {
+        Assert.assertEquals(PARTITION_HEX,
+            DdbAdapterCdcUtils.partitionIdFromShardId(PARTITION_HEX));
+    }
+
+    @Test
+    public void testPartitionIdFromShardId_idempotent() {
+        String shardId = DdbAdapterCdcUtils.toShardId(PARTITION_START_MS, 
PARTITION_HEX);
+        String once = DdbAdapterCdcUtils.partitionIdFromShardId(shardId);
+        Assert.assertEquals(once, 
DdbAdapterCdcUtils.partitionIdFromShardId(once));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPartitionIdFromShardId_rejectsNull() {
+        DdbAdapterCdcUtils.partitionIdFromShardId(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPartitionIdFromShardId_rejectsEmpty() {
+        DdbAdapterCdcUtils.partitionIdFromShardId("");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPartitionIdFromShardId_rejectsMissingHexSegment() {
+        DdbAdapterCdcUtils.partitionIdFromShardId("shardId-" + 
PARTITION_START_MS);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPartitionIdFromShardId_rejectsTrailingDashWithoutHex() {
+        DdbAdapterCdcUtils.partitionIdFromShardId("shardId-" + 
PARTITION_START_MS + "-");
+    }
+
+    private static final long SEQ_TS = 1738372050000L;
+    private static final int SEQ_OFFSET = 420;
+    private static final long SEQ_NUMERIC = 173837205000000420L;
+    private static final String SEQ_18 = "173837205000000420";
+    private static final String SEQ_21 = "000173837205000000420";
+
+    @Test
+    public void testGetSequenceNumber_emits21DigitZeroPadded() {
+        String seq = DdbAdapterCdcUtils.getSequenceNumber(SEQ_TS, SEQ_OFFSET);
+        Assert.assertEquals("len=" + seq.length(), 21, seq.length());
+        Assert.assertEquals(SEQ_21, seq);
+    }
+
+    @Test
+    public void testGetSequenceNumber_numericValueUnchangedFromLegacyConcat() {
+        String legacyConcat =
+            SEQ_TS + String.format("%0" + DdbAdapterCdcUtils.OFFSET_LENGTH + 
"d", SEQ_OFFSET);
+        String paddedNew = DdbAdapterCdcUtils.getSequenceNumber(SEQ_TS, 
SEQ_OFFSET);
+        Assert.assertEquals(Long.parseLong(legacyConcat), 
Long.parseLong(paddedNew));
+    }
+
+    @Test
+    public void testParseSequenceNumber_acceptsBothFormatsIdentically() {
+        Assert.assertEquals(SEQ_NUMERIC, 
DdbAdapterCdcUtils.parseSequenceNumber(SEQ_18));
+        Assert.assertEquals(SEQ_NUMERIC, 
DdbAdapterCdcUtils.parseSequenceNumber(SEQ_21));
+        Assert.assertEquals(SEQ_NUMERIC, 
DdbAdapterCdcUtils.parseSequenceNumber(
+            DdbAdapterCdcUtils.getSequenceNumber(SEQ_TS, SEQ_OFFSET)));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testParseSequenceNumber_rejectsNull() {
+        DdbAdapterCdcUtils.parseSequenceNumber(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testParseSequenceNumber_rejectsEmpty() {
+        DdbAdapterCdcUtils.parseSequenceNumber("");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testParseSequenceNumber_rejectsNonNumeric() {
+        DdbAdapterCdcUtils.parseSequenceNumber("abc");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testParseSequenceNumber_rejectsNegative() {
+        DdbAdapterCdcUtils.parseSequenceNumber("-1");
+    }
+
+    @Test
+    public void testGetEventId_stableAcrossSequenceNumberFormatChange() {
+        String idLegacy = DdbAdapterCdcUtils.getEventId(TABLE_NAME, 
PARTITION_HEX, SEQ_18);
+        String idNew = DdbAdapterCdcUtils.getEventId(TABLE_NAME, 
PARTITION_HEX, SEQ_21);
+        Assert.assertEquals(idLegacy, idNew);
+        Assert.assertEquals(32, idLegacy.length());
+        Assert.assertTrue(idLegacy.matches("[0-9a-f]{32}"));
+    }
+
+    @Test
+    public void testGetEventId_matchesPreWiCRawHashOnLegacySeqNum() {
+        String expected;
+        try {
+            String input = TABLE_NAME + "|" + PARTITION_HEX + "|" + SEQ_18;
+            java.security.MessageDigest md = 
java.security.MessageDigest.getInstance("MD5");
+            byte[] digest = 
md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+            expected = String.format("%032x", new java.math.BigInteger(1, 
digest));
+        } catch (java.security.NoSuchAlgorithmException e) {
+            throw new RuntimeException(e);
+        }
+        Assert.assertEquals(expected,
+            DdbAdapterCdcUtils.getEventId(TABLE_NAME, PARTITION_HEX, SEQ_18));
+        Assert.assertEquals(expected,
+            DdbAdapterCdcUtils.getEventId(TABLE_NAME, PARTITION_HEX, SEQ_21));
+    }
+
+    private static final String SHARD_ITER_STREAM_TYPE = "NEW_AND_OLD_IMAGES";
+
+    private static String canonicalStreamArn() {
+        return 
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+    }
+
+    private static PhoenixShardIterator newIterator() {
+        return new PhoenixShardIterator(canonicalStreamArn(), 
internalStreamName(CDC_INDEX_TS),
+            SHARD_ITER_STREAM_TYPE, PARTITION_HEX, SEQ_21);
+    }
+
+    @Test
+    public void testShardIterator_toString_emitsAwsShape() {
+        String token = newIterator().toString();
+        Assert.assertTrue("must start with canonical streamArn: " + token,
+            token.startsWith(canonicalStreamArn() + "|"));
+        String[] parts = token.split("\\|", -1);
+        Assert.assertEquals("expected 3 outer parts: " + token, 3, 
parts.length);
+        Assert.assertEquals("version sentinel must be \"1\": " + token, "1", 
parts[1]);
+        String json = new String(Base64.getDecoder().decode(parts[2]), 
StandardCharsets.UTF_8);
+        Assert.assertTrue(json.contains("\"streamType\":\"" + 
SHARD_ITER_STREAM_TYPE + "\""));
+        Assert.assertTrue(json.contains("\"partitionId\":\"" + PARTITION_HEX + 
"\""));
+        Assert.assertTrue(json.contains("\"seqNum\":\"" + SEQ_21 + "\""));
+    }
+
+    @Test
+    public void testShardIterator_roundTrip_recoversAllFields() {
+        PhoenixShardIterator original = newIterator();
+        PhoenixShardIterator parsed = new 
PhoenixShardIterator(original.toString());
+        Assert.assertEquals(canonicalStreamArn(), parsed.getStreamArn());
+        Assert.assertEquals(SHARD_ITER_STREAM_TYPE, parsed.getStreamType());
+        Assert.assertEquals(PARTITION_HEX, parsed.getPartitionId());
+        Assert.assertEquals(SEQ_21, parsed.getSeqNum());
+        Assert.assertEquals(original.getTimestamp(), parsed.getTimestamp());
+        Assert.assertEquals(original.getOffset(), parsed.getOffset());
+    }
+
+    @Test
+    public void 
testShardIterator_roundTrip_derivesTableAndCdcObjectFromStreamArn() {
+        PhoenixShardIterator parsed = new 
PhoenixShardIterator(newIterator().toString());
+        Assert.assertEquals(INTERNAL_TABLE, parsed.getTableName());
+        Assert.assertEquals("CDC_" + TABLE_NAME, parsed.getCdcObject());
+    }
+
+    @Test
+    public void testShardIterator_roundTrip_byteIdenticalToString() {
+        String first = newIterator().toString();
+        String second = new PhoenixShardIterator(first).toString();
+        Assert.assertEquals(first, second);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsNull() {
+        new PhoenixShardIterator((String) null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsEmpty() {
+        new PhoenixShardIterator("");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsMalformedOuter() {
+        new PhoenixShardIterator(canonicalStreamArn() + "|1");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsUnknownVersion() {
+        String validInnerBase64 = newIterator().toString().split("\\|", -1)[2];
+        new PhoenixShardIterator(canonicalStreamArn() + "|9|" + 
validInnerBase64);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsCorruptBase64() {
+        new PhoenixShardIterator(canonicalStreamArn() + "|1|!!!not-base64!!!");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsMalformedJson() {
+        String notJson = Base64.getEncoder().withoutPadding()
+            .encodeToString("not-json".getBytes(StandardCharsets.UTF_8));
+        new PhoenixShardIterator(canonicalStreamArn() + "|1|" + notJson);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsMissingStateField() {
+        String partialState = Base64.getEncoder().withoutPadding()
+            
.encodeToString("{\"streamType\":\"NEW_IMAGE\"}".getBytes(StandardCharsets.UTF_8));
+        new PhoenixShardIterator(canonicalStreamArn() + "|1|" + partialState);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testShardIterator_parse_rejectsLegacySlashFormat() {
+        new PhoenixShardIterator(
+            "shardIterator/" + INTERNAL_TABLE + "/CDC_" + TABLE_NAME + 
"/NEW_IMAGE/" + PARTITION_HEX
+                + "/" + SEQ_21);
+    }
+
+    @Test
+    public void 
testShardIterator_setNewSeqNum_preservesAllOtherFieldsThroughRoundTrip() {
+        PhoenixShardIterator pIter = newIterator();
+        long newTs = SEQ_TS + 1000;
+        int newOffset = 5;
+        pIter.setNewSeqNum(newTs, newOffset);
+        Assert.assertEquals(DdbAdapterCdcUtils.getSequenceNumber(newTs, 
newOffset),
+            pIter.getSeqNum());
+        PhoenixShardIterator reparsed = new 
PhoenixShardIterator(pIter.toString());
+        Assert.assertEquals(canonicalStreamArn(), reparsed.getStreamArn());
+        Assert.assertEquals(SHARD_ITER_STREAM_TYPE, reparsed.getStreamType());
+        Assert.assertEquals(PARTITION_HEX, reparsed.getPartitionId());
+        Assert.assertEquals(INTERNAL_TABLE, reparsed.getTableName());
+        Assert.assertEquals("CDC_" + TABLE_NAME, reparsed.getCdcObject());
+        Assert.assertEquals(DdbAdapterCdcUtils.getSequenceNumber(newTs, 
newOffset),
+            reparsed.getSeqNum());
+        Assert.assertEquals(newTs, reparsed.getTimestamp());
+        Assert.assertEquals(newOffset, reparsed.getOffset());
+    }
+
+    @Test
+    public void testShardIterator_lengthWithinAwsSpec_realisticInputs() {
+        String token = newIterator().toString();
+        Assert.assertTrue("token must fit AWS [1, 2048] spec, was len=" + 
token.length(),
+            token.length() >= 1 && token.length() <= 2048);
+    }
+
+    @Test
+    public void testShardIterator_lengthWithinAwsSpec_longTableName() {
+        StringBuilder longBare = new StringBuilder("T");
+        while (longBare.length() < 200) {
+            longBare.append("X");
+        }
+        String longBareStr = longBare.toString();
+        String longStreamArn =
+            "arn:aws:dynamodb:us-west-2:000000000000:table/" + longBareStr + 
"/stream/" + formatUtc(
+                CDC_INDEX_TS);
+        String longInternalStreamName =
+            "phoenix/cdc/stream/DDB." + longBareStr + "/CDC_" + longBareStr + 
"/" + CDC_INDEX_TS
+                + "/" + formatUtc(CDC_INDEX_TS);
+        PhoenixShardIterator big =
+            new PhoenixShardIterator(longStreamArn, longInternalStreamName, 
SHARD_ITER_STREAM_TYPE,
+                PARTITION_HEX, SEQ_21);
+        String token = big.toString();
+        Assert.assertTrue("token must fit AWS [1, 2048] spec, was len=" + 
token.length(),
+            token.length() <= 2048);
+        PhoenixShardIterator reparsed = new PhoenixShardIterator(token);
+        Assert.assertEquals(longStreamArn, reparsed.getStreamArn());
+        Assert.assertEquals("DDB." + longBare, reparsed.getTableName());
+    }
+}
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java
index 962f936..e861f5e 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java
@@ -50,11 +50,16 @@ import 
software.amazon.awssdk.services.dynamodb.model.StreamDescription;
 import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
 import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
 
+import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
+import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
+
 import java.sql.DriverManager;
 import java.util.Map;
 
 import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.MAX_NUM_CHANGES_AT_TIMESTAMP;
 import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_DELIM;
+import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_NUM_PARTS;
+import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_VERSION;
 import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
 import static 
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
@@ -173,26 +178,30 @@ public class GetShardIteratorIT {
                 .shardId(shardId);
 
         String testSeqNum = "173837205000000420"; // 1738372050000+00420
-        String testSeqNumPlusOne = String.valueOf(Long.parseLong(testSeqNum) + 
1); // 1738372050000+00421
+        long testSeqNumLong = Long.parseLong(testSeqNum);
         //AT_SEQUENCE_NUMBER
         request.sequenceNumber(testSeqNum);
         request.shardIteratorType(AT_SEQUENCE_NUMBER);
         GetShardIteratorResponse result = 
phoenixDBStreamsClientV2.getShardIterator(request.build());
         validateShardIterator(result.shardIterator(), tableName, 
"CDC_"+tableName, "OLD_IMAGE", shardId);
-        Assert.assertTrue(result.shardIterator().contains(testSeqNum));
+        // The iterator's inner seqNum is the canonical 21-digit form of 
testSeqNum.
+        Assert.assertEquals(testSeqNumLong,
+                Long.parseLong(new 
PhoenixShardIterator(result.shardIterator()).getSeqNum()));
 
         //AFTER_SEQUENCE_NUMBER
         request.shardIteratorType(AFTER_SEQUENCE_NUMBER);
         result = phoenixDBStreamsClientV2.getShardIterator(request.build());
         validateShardIterator(result.shardIterator(), tableName, 
"CDC_"+tableName, "OLD_IMAGE", shardId);
-        Assert.assertTrue(result.shardIterator().contains(testSeqNumPlusOne));
+        Assert.assertEquals(testSeqNumLong + 1,
+                Long.parseLong(new 
PhoenixShardIterator(result.shardIterator()).getSeqNum()));
 
         //TRIM_HORIZON
         request.sequenceNumber(null);
         request.shardIteratorType(TRIM_HORIZON);
         result = phoenixDBStreamsClientV2.getShardIterator(request.build());
         validateShardIterator(result.shardIterator(), tableName, 
"CDC_"+tableName, "OLD_IMAGE", shardId);
-        Assert.assertTrue(result.shardIterator().contains(shardStartSeqNum));
+        Assert.assertEquals(Long.parseLong(shardStartSeqNum),
+                Long.parseLong(new 
PhoenixShardIterator(result.shardIterator()).getSeqNum()));
 
         //LATEST
         request.sequenceNumber(null);
@@ -200,18 +209,27 @@ public class GetShardIteratorIT {
         request.shardIteratorType(LATEST);
         result = phoenixDBStreamsClientV2.getShardIterator(request.build());
         validateShardIterator(result.shardIterator(), tableName, 
"CDC_"+tableName, "OLD_IMAGE", shardId);
-        String[] shardIter = 
result.shardIterator().split(SHARD_ITERATOR_DELIM);
         // shard iterator would be created after the current time we recorded 
here
-        Assert.assertTrue(Long.parseLong(shardIter[shardIter.length-1]) > 
currentTime * MAX_NUM_CHANGES_AT_TIMESTAMP);
+        long latestSeqNum = Long.parseLong(
+                new PhoenixShardIterator(result.shardIterator()).getSeqNum());
+        Assert.assertTrue(latestSeqNum > currentTime * 
MAX_NUM_CHANGES_AT_TIMESTAMP);
     }
 
     private void validateShardIterator(String shardIter, String tableName, 
String cdcObj,
                                        String streamType, String shardId) {
         LOGGER.info("Shard Iterator: " + shardIter);
-        Assert.assertTrue(shardIter.contains(tableName));
-        Assert.assertTrue(shardIter.contains(cdcObj));
-        Assert.assertTrue(shardIter.contains(streamType));
-        Assert.assertTrue(shardIter.contains(shardId));
+        Assert.assertTrue("must start with canonical streamArn, was: " + 
shardIter,
+            
shardIter.startsWith("arn:aws:dynamodb:us-west-2:000000000000:table/"
+                + tableName + "/stream/"));
+        String[] parts = 
shardIter.split(java.util.regex.Pattern.quote(SHARD_ITERATOR_DELIM), -1);
+        Assert.assertEquals(SHARD_ITERATOR_NUM_PARTS, parts.length);
+        Assert.assertEquals(SHARD_ITERATOR_VERSION, parts[1]);
+        PhoenixShardIterator parsed = new PhoenixShardIterator(shardIter);
+        Assert.assertEquals("DDB." + tableName, parsed.getTableName());
+        Assert.assertEquals(cdcObj, parsed.getCdcObject());
+        Assert.assertEquals(streamType, parsed.getStreamType());
+        Assert.assertEquals(DdbAdapterCdcUtils.partitionIdFromShardId(shardId),
+            parsed.getPartitionId());
     }
 
 }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java
index 9eebbf5..847ce78 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java
@@ -44,7 +44,6 @@ import java.sql.DriverManager;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.TimeZone;
 
@@ -134,8 +133,8 @@ public class ListStreamsIT {
         Assert.assertEquals(tableName, phoenixStream.tableName());
         SimpleDateFormat df = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
         df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        Date date = df.parse(phoenixStream.streamLabel());
-        
Assert.assertTrue(phoenixStream.streamArn().contains(String.valueOf(date.getTime())));
+        df.parse(phoenixStream.streamLabel());
+        Assert.assertTrue(phoenixStream.streamArn().endsWith("/stream/" + 
phoenixStream.streamLabel()));
     }
 
     @Test(timeout = 120000)
diff --git 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
index 40614c5..24b4160 100644
--- 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
+++ 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
@@ -15,9 +15,15 @@ import java.security.NoSuchAlgorithmException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.format.ResolverStyle;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
@@ -34,13 +40,33 @@ public class DdbAdapterCdcUtils {
     public static final int OFFSET_LENGTH = 5;
     public static final int MAX_NUM_CHANGES_AT_TIMESTAMP = (int) Math.pow(10, 
OFFSET_LENGTH);
 
-    // 
shardIterator/<tableName>/<cdcObject>/<streamType>/<partitionID>/<startSeqNum>
-    public static final String SHARD_ITERATOR_FORMAT = 
"shardIterator/%s/%s/%s/%s/%s";
-    public static final String SHARD_ITERATOR_DELIM = "/";
-    public static final int SHARD_ITERATOR_NUM_PARTS = 6;
+    public static final String SHARD_ITERATOR_VERSION = "1";
+    public static final String SHARD_ITERATOR_DELIM = "|";
+    public static final int SHARD_ITERATOR_NUM_PARTS = 3;
+    // JSON key names for the inner state payload of shard iterator
+    public static final String SI_FIELD_STREAM_TYPE = "streamType";
+    public static final String SI_FIELD_PARTITION_ID = "partitionId";
+    public static final String SI_FIELD_SEQ_NUM = "seqNum";
     // phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index 
timestamp}/{creation datetime}
     public static final String STREAM_NAME_DELIM = "/";
     public static final int STREAM_NAME_NUM_PARTS = 7;
+    public static final String STREAM_NAME_PREFIX = "phoenix/cdc/stream/";
+    public static final String CDC_OBJECT_PREFIX = "CDC_";
+
+    public static final String STREAM_ARN_REGION = "us-west-2";
+    public static final String STREAM_ARN_ACCOUNT_ID = "000000000000";
+    public static final String STREAM_ARN_PREFIX =
+        "arn:aws:dynamodb:" + STREAM_ARN_REGION + ":" + STREAM_ARN_ACCOUNT_ID 
+ ":table/";
+    public static final String STREAM_ARN_INFIX = "/stream/";
+
+    // ShardId format: shardId-<partitionStartMs>-<32-char-hex-partition-id>
+    public static final String SHARD_ID_PREFIX = "shardId-";
+    private static final String SHARD_ID_DELIM = "-";
+    public static final String STREAM_LABEL_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSS";
+    private static final String STREAM_LABEL_PARSE_PATTERN = 
"uuuu-MM-dd'T'HH:mm:ss.SSS";
+    private static final DateTimeFormatter STREAM_LABEL_FORMATTER =
+        DateTimeFormatter.ofPattern(STREAM_LABEL_PARSE_PATTERN, Locale.ROOT)
+            .withResolverStyle(ResolverStyle.STRICT).withZone(ZoneOffset.UTC);
 
     private static final String STREAM_NAME_QUERY
             = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME
@@ -145,6 +171,106 @@ public class DdbAdapterCdcUtils {
         return Long.parseLong(getStreamNameComponent(streamName, 5));
     }
 
+    /**
+     * Detect whether the given identifier is the AWS-shaped stream ARN
+     * rather than the internal stream name.
+     */
+    public static boolean isStreamArn(String s) {
+        return s != null && s.startsWith(STREAM_ARN_PREFIX);
+    }
+
+    /**
+     * Convert the internal stream name to the AWS-shaped ARN
+     * (e.g. 
arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000).
+     */
+    public static String toStreamArn(String streamName) {
+        String internalTableName = getTableNameFromStreamName(streamName);
+        String bareTableName = 
PhoenixUtils.getTableNameFromFullName(internalTableName, false);
+        String creationDateTime = getStreamLabel(streamName);
+        return STREAM_ARN_PREFIX + bareTableName + STREAM_ARN_INFIX + 
creationDateTime;
+    }
+
+    /**
+     * Convert the AWS-shaped stream ARN back to the internal stream name.
+     */
+    public static String fromStreamArn(String streamArn) {
+        if (!isStreamArn(streamArn)) {
+            throw new IllegalArgumentException("Not a synthetic stream ARN: " 
+ streamArn);
+        }
+        String body = streamArn.substring(STREAM_ARN_PREFIX.length());
+        int infixIdx = body.indexOf(STREAM_ARN_INFIX);
+        if (infixIdx < 0) {
+            throw new IllegalArgumentException("Stream ARN missing /stream/ 
segment: " + streamArn);
+        }
+        String arnTableName = body.substring(0, infixIdx);
+        String creationDateTime = body.substring(infixIdx + 
STREAM_ARN_INFIX.length());
+        long cdcIndexTimestamp = parseCreationDateTime(creationDateTime, 
streamArn);
+        String bareTableName = 
PhoenixUtils.getTableNameFromFullName(arnTableName, false);
+        String internalTableName = 
PhoenixUtils.getFullTableName(bareTableName, false);
+        return STREAM_NAME_PREFIX + internalTableName + STREAM_NAME_DELIM
+            + CDC_OBJECT_PREFIX + bareTableName + STREAM_NAME_DELIM
+            + cdcIndexTimestamp + STREAM_NAME_DELIM + creationDateTime;
+    }
+
+    public static String normalizeStreamName(String streamArnOrName) {
+        if (streamArnOrName == null || streamArnOrName.isEmpty()) {
+            throw new IllegalArgumentException("StreamArn is required");
+        }
+        return isStreamArn(streamArnOrName) ? fromStreamArn(streamArnOrName) : 
streamArnOrName;
+    }
+
+    /**
+     * Encode an internal Phoenix partition into the AWS-shaped {@code ShardId}
+     * {@code shardId-<partitionStartMs>-<partitionHex>}. Length stays in the 
AWS
+     * spec range [28, 65] for any positive epoch-millis and the 32-char HBase 
region
+     * encoded partition id.
+     *
+     * @param partitionStartMs epoch millis of the partition's start time
+     * @param partitionHex     HBase region encoded partition id (32-char 
lowercase hex)
+     */
+    public static String toShardId(long partitionStartMs, String partitionHex) 
{
+        return SHARD_ID_PREFIX + partitionStartMs + SHARD_ID_DELIM + 
partitionHex;
+    }
+
+    /**
+     * Detect whether the given identifier is the AWS-shaped {@code ShardId} 
rather than
+     * the raw HBase region encoded partition id.
+     */
+    public static boolean isShardId(String s) {
+        return s != null && s.startsWith(SHARD_ID_PREFIX);
+    }
+
+    /**
+     * Extract the bare HBase region encoded partition id from either the new
+     * AWS-shaped {@code ShardId} ({@code shardId-<ms>-<hex>}) or a raw 
partition
+     * hex string.
+     */
+    public static String partitionIdFromShardId(String shardIdOrPartitionHex) {
+        if (shardIdOrPartitionHex == null || shardIdOrPartitionHex.isEmpty()) {
+            throw new IllegalArgumentException("ShardId is required");
+        }
+        if (!isShardId(shardIdOrPartitionHex)) {
+            return shardIdOrPartitionHex;
+        }
+        String body = 
shardIdOrPartitionHex.substring(SHARD_ID_PREFIX.length());
+        int dashIdx = body.indexOf(SHARD_ID_DELIM);
+        if (dashIdx < 0 || dashIdx == body.length() - 1) {
+            throw new IllegalArgumentException(
+                "ShardId missing partition hex segment: " + 
shardIdOrPartitionHex);
+        }
+        return body.substring(dashIdx + 1);
+    }
+
+    private static long parseCreationDateTime(String creationDateTime, String 
streamArn) {
+        try {
+            return STREAM_LABEL_FORMATTER.parse(creationDateTime, 
Instant::from).toEpochMilli();
+        } catch (DateTimeParseException e) {
+            throw new IllegalArgumentException(
+                "Stream ARN label is not a UTC ISO timestamp (" + 
STREAM_LABEL_FORMAT + "): "
+                    + streamArn, e);
+        }
+    }
+
     /**
      * Return the stream type for the given table stored in the SCHEMA_VERSION 
column of the ptable.
      */
@@ -186,11 +312,28 @@ public class DdbAdapterCdcUtils {
     }
 
     /**
-     * Build a sequence number of the form <timestamp><offset>.
-     * offset should be 0-padded for OFFSET_LENGTH
+     * Build a 21-digit zero-padded sequence number (SequenceNumber min length 
21,
+     * max 40). The numeric value is identical to the form
+     * (timestamp * 10^OFFSET_LENGTH + offset)
      */
     public static String getSequenceNumber(long timestamp, int offset) {
-        return timestamp + String.format("%0" + OFFSET_LENGTH + "d", offset);
+        return String.format("%021d", timestamp * MAX_NUM_CHANGES_AT_TIMESTAMP 
+ offset);
+    }
+
+    public static long parseSequenceNumber(String s) {
+        if (s == null || s.isEmpty()) {
+            throw new IllegalArgumentException("SequenceNumber is required");
+        }
+        long val;
+        try {
+            val = Long.parseLong(s);
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException("SequenceNumber is not numeric: 
" + s, e);
+        }
+        if (val < 0) {
+            throw new IllegalArgumentException("SequenceNumber must be 
non-negative: " + s);
+        }
+        return val;
     }
 
     /**
@@ -199,12 +342,13 @@ public class DdbAdapterCdcUtils {
      *
      * @param tableName the table name from the shard iterator
      * @param partitionId the partition (shard) ID
-     * @param sequenceNumber the per-event sequence number
+     * @param sequenceNumber the per-event sequence number (any padded length)
      * @return 32-char lowercase hex string
      */
     public static String getEventId(String tableName, String partitionId, 
String sequenceNumber) {
+        long canonicalSeq = parseSequenceNumber(sequenceNumber);
         try {
-            String input = tableName + "|" + partitionId + "|" + 
sequenceNumber;
+            String input = tableName + "|" + partitionId + "|" + canonicalSeq;
             MessageDigest md = MessageDigest.getInstance("MD5");
             byte[] digest = md.digest(input.getBytes(StandardCharsets.UTF_8));
             return String.format("%032x", new BigInteger(1, digest));
diff --git 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
index 69a5597..7b3d918 100644
--- 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
+++ 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
@@ -1,17 +1,40 @@
 package org.apache.phoenix.ddb.utils;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
 
 import static org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.OFFSET_LENGTH;
 import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_DELIM;
-import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_FORMAT;
 import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_NUM_PARTS;
+import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_VERSION;
+import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SI_FIELD_PARTITION_ID;
+import static org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SI_FIELD_SEQ_NUM;
+import static 
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SI_FIELD_STREAM_TYPE;
 
 /**
  * Class to represent a shard iterator for Phoenix CDC queries.
- * Format: 
shardIterator-<tableName>-<cdcObject>-<streamType>-<partitionID>-<startSeqNum>
+ * The format:
+ * <pre>
+ *   &lt;streamArn&gt;|&lt;version&gt;|&lt;base64(JSON state)&gt;
+ * </pre>
  */
 public class PhoenixShardIterator {
 
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final Pattern DELIM_PATTERN =
+        Pattern.compile(Pattern.quote(SHARD_ITERATOR_DELIM));
+    private static final TypeReference<Map<String, String>> STATE_TYPE_REF =
+        new TypeReference<Map<String, String>>() {
+        };
+
+    private final String streamArn;
     private final String tableName;
     private final String cdcObject;
 
@@ -21,20 +44,67 @@ public class PhoenixShardIterator {
     private long timestamp;
     private int offset;
 
+    /**
+     * Build a fresh iterator from server-side state. Used by
+     * {@code GetShardIteratorService} when emitting a brand-new iterator.
+     *
+     * @param streamArn   stream ARN
+     * @param streamName  internal Phoenix stream name
+     * @param streamType  stream view type (NEW_IMAGE, OLD_IMAGE, KEYS_ONLY,
+     *                    NEW_AND_OLD_IMAGES)
+     * @param partitionId region encoded partition id
+     * @param seqNum      resume sequence number
+     */
+    public PhoenixShardIterator(String streamArn, String streamName, String 
streamType,
+        String partitionId, String seqNum) {
+        this.streamArn = streamArn;
+        this.tableName = 
DdbAdapterCdcUtils.getTableNameFromStreamName(streamName);
+        this.cdcObject = 
DdbAdapterCdcUtils.getCDCObjectNameFromStreamName(streamName);
+        this.streamType = streamType;
+        this.partitionId = partitionId;
+        this.seqNum = seqNum;
+        setTimestampAndOffset();
+    }
+
     public PhoenixShardIterator(String shardIterator) {
-        String[] shardIteratorComponents = 
shardIterator.split(SHARD_ITERATOR_DELIM);
-        if (shardIteratorComponents.length != SHARD_ITERATOR_NUM_PARTS) {
-            throw new IllegalArgumentException(shardIterator
-                    + ": Provided shard iterator is not of the right format.");
+        if (shardIterator == null || shardIterator.isEmpty()) {
+            throw new IllegalArgumentException("ShardIterator is required");
+        }
+        String[] parts = DELIM_PATTERN.split(shardIterator, -1);
+        if (parts.length != SHARD_ITERATOR_NUM_PARTS) {
+            throw new IllegalArgumentException(
+                "ShardIterator must be of the form 
<streamArn>|<version>|<base64-state>: "
+                    + shardIterator);
+        }
+        String parsedStreamArn = parts[0];
+        String version = parts[1];
+        String base64State = parts[2];
+        if (!SHARD_ITERATOR_VERSION.equals(version)) {
+            throw new IllegalArgumentException(
+                "Unsupported ShardIterator version: " + version + " (expected "
+                    + SHARD_ITERATOR_VERSION + ")");
+        }
+        String internalStreamName = 
DdbAdapterCdcUtils.fromStreamArn(parsedStreamArn);
+        this.streamArn = parsedStreamArn;
+        this.tableName = 
DdbAdapterCdcUtils.getTableNameFromStreamName(internalStreamName);
+        this.cdcObject = 
DdbAdapterCdcUtils.getCDCObjectNameFromStreamName(internalStreamName);
+
+        Map<String, String> state = decodeState(base64State, shardIterator);
+        this.streamType = state.get(SI_FIELD_STREAM_TYPE);
+        this.partitionId = state.get(SI_FIELD_PARTITION_ID);
+        this.seqNum = state.get(SI_FIELD_SEQ_NUM);
+        if (this.streamType == null || this.partitionId == null || this.seqNum 
== null) {
+            throw new IllegalArgumentException(
+                "ShardIterator state missing required fields (" + 
SI_FIELD_STREAM_TYPE + ", "
+                    + SI_FIELD_PARTITION_ID + ", " + SI_FIELD_SEQ_NUM + "): " 
+ shardIterator);
         }
-        this.tableName = shardIteratorComponents[1];
-        this.cdcObject = shardIteratorComponents[2];
-        this.streamType = shardIteratorComponents[3];
-        this.partitionId = shardIteratorComponents[4];
-        this.seqNum = shardIteratorComponents[5];
         setTimestampAndOffset();
     }
 
+    public String getStreamArn() {
+        return streamArn;
+    }
+
     public String getTableName() {
         return tableName;
     }
@@ -71,8 +141,36 @@ public class PhoenixShardIterator {
 
     @Override
     public String toString() {
-        return String.format(SHARD_ITERATOR_FORMAT,
-                tableName, cdcObject, streamType, partitionId, seqNum);
+        Map<String, String> state = new LinkedHashMap<>();
+        state.put(SI_FIELD_STREAM_TYPE, streamType);
+        state.put(SI_FIELD_PARTITION_ID, partitionId);
+        state.put(SI_FIELD_SEQ_NUM, seqNum);
+        byte[] jsonBytes;
+        try {
+            jsonBytes = OBJECT_MAPPER.writeValueAsBytes(state);
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to serialize ShardIterator 
state", e);
+        }
+        String base64State = Base64.getEncoder().encodeToString(jsonBytes);
+        return streamArn + SHARD_ITERATOR_DELIM + SHARD_ITERATOR_VERSION + 
SHARD_ITERATOR_DELIM
+            + base64State;
+    }
+
+    private static Map<String, String> decodeState(String base64State, String 
shardIterator) {
+        byte[] stateBytes;
+        try {
+            stateBytes = Base64.getDecoder().decode(base64State);
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                "ShardIterator base64 state is not decodable: " + 
shardIterator, e);
+        }
+        try {
+            return OBJECT_MAPPER.readValue(stateBytes, STATE_TYPE_REF);
+        } catch (IOException e) {
+            String decoded = new String(stateBytes, StandardCharsets.UTF_8);
+            throw new IllegalArgumentException("ShardIterator state is not 
valid JSON: " + decoded,
+                e);
+        }
     }
 
     private void setTimestampAndOffset() {

Reply via email to