This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 55473b3 Align DynamoDB Streams identifiers with AWS
55473b3 is described below
commit 55473b30773c251e3c319d75db500cf28ffef2b2
Author: Viraj Jasani <[email protected]>
AuthorDate: Mon May 18 17:09:03 2026 -0700
Align DynamoDB Streams identifiers with AWS
---
DDB_API_REFERENCE.md | 51 ++-
.../phoenix/ddb/service/DescribeStreamService.java | 111 +++--
.../phoenix/ddb/service/GetRecordsService.java | 15 +-
.../ddb/service/GetShardIteratorService.java | 44 +-
.../phoenix/ddb/service/ListStreamsService.java | 11 +-
.../ddb/service/utils/TableDescriptorUtils.java | 3 +-
.../java/org/apache/phoenix/ddb/DDLTestUtils.java | 6 +-
.../apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java | 462 +++++++++++++++++++++
.../org/apache/phoenix/ddb/GetShardIteratorIT.java | 38 +-
.../java/org/apache/phoenix/ddb/ListStreamsIT.java | 5 +-
.../phoenix/ddb/utils/DdbAdapterCdcUtils.java | 162 +++++++-
.../phoenix/ddb/utils/PhoenixShardIterator.java | 124 +++++-
12 files changed, 925 insertions(+), 107 deletions(-)
diff --git a/DDB_API_REFERENCE.md b/DDB_API_REFERENCE.md
index a77158d..fba007e 100644
--- a/DDB_API_REFERENCE.md
+++ b/DDB_API_REFERENCE.md
@@ -583,8 +583,8 @@ Returns the full description of a table including its
schema, indexes, stream co
"StreamEnabled": true,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
- "LatestStreamArn": "phoenix/cdc/stream/...",
- "LatestStreamLabel": "2024-01-15T10:30:00Z"
+ "LatestStreamArn":
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000",
+ "LatestStreamLabel": "2024-01-15T10:30:00.000"
}
}
```
@@ -1406,16 +1406,22 @@ Returns a list of all streams, optionally filtered by
table name.
"Streams": [
{
"TableName": "MyTable",
- "StreamArn": "phoenix/cdc/stream/MyTable/...",
- "StreamLabel": "2024-01-15T10:30:00Z"
+ "StreamArn":
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000",
+ "StreamLabel": "2024-01-15T10:30:00.000"
}
],
- "LastEvaluatedStreamArn": "phoenix/cdc/stream/MyTable/..."
+ "LastEvaluatedStreamArn":
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000"
}
```
`LastEvaluatedStreamArn` is present only when the result count equals the
limit.
+The emitted `StreamArn` is an AWS-shaped synthetic ARN. Region (`us-west-2`)
and account
+(`000000000000`) are fixed sentinels because phoenix-adapters can run
anywhere; the label
+segment is the UTC ISO timestamp matching AWS DynamoDB Streams' StreamLabel
format.
+For backward compatibility, every endpoint that accepts a stream identifier
also accepts
+the legacy bare internal name
`phoenix/cdc/stream/{table}/CDC_{table}/{ts}/{creationDt}`.
+
---
### 9.2 DescribeStream
@@ -1437,9 +1443,9 @@ Returns detailed information about a stream including its
shards.
```json
{
"StreamDescription": {
- "StreamArn": "phoenix/cdc/stream/MyTable/...",
+ "StreamArn":
"arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000",
"TableName": "MyTable",
- "StreamLabel": "2024-01-15T10:30:00Z",
+ "StreamLabel": "2024-01-15T10:30:00.000",
"StreamViewType": "NEW_AND_OLD_IMAGES",
"CreationRequestDateTime": 1700000000.000,
"KeySchema": [
@@ -1448,15 +1454,15 @@ Returns detailed information about a stream including
its shards.
"StreamStatus": "ENABLED",
"Shards": [
{
- "ShardId": "partition-1",
- "ParentShardId": "parent-partition-0",
+ "ShardId": "shardId-1700000099999-1a2b3c4d5e6f7890abcdef0123456789",
+ "ParentShardId":
"shardId-1700000000000-a1b2c3d4e5f60718293a4b5c6d7e8f90",
"SequenceNumberRange": {
- "StartingSequenceNumber": "170000000000000",
- "EndingSequenceNumber": "170100000099999"
+ "StartingSequenceNumber": "000170000009999900000",
+ "EndingSequenceNumber": "000170000010000099999"
}
}
],
- "LastEvaluatedShardId": "partition-1"
+ "LastEvaluatedShardId":
"shardId-1700000099999-1a2b3c4d5e6f7890abcdef0123456789"
}
}
```
@@ -1464,6 +1470,8 @@ Returns detailed information about a stream including its
shards.
- Shards are only listed when `StreamStatus` is `ENABLED`
- `EndingSequenceNumber` is only present for closed shards (after a split)
- `LastEvaluatedShardId` is present only when shard count equals the limit
+- `ShardId` and `ParentShardId` follow the AWS-shape
`shardId-<partitionStartMs>-<32-char-hex>` format (length 49-54, within the AWS
spec [28, 65]). `ParentShardId` is omitted when the parent partition has been
TTL-pruned from `SYSTEM.CDC_STREAM`.
+- `StartingSequenceNumber` and `EndingSequenceNumber` are 21-digit zero-padded
numeric strings (matching the AWS spec minimum length of 21).
`BigInteger`/`Long.parseLong` both ignore leading zeros so numeric ordering vs.
unpadded values is preserved.
---
@@ -1495,11 +1503,18 @@ Gets a shard iterator for reading records from a
specific position in a shard.
```json
{
- "ShardIterator": "shardIterator/tableName/cdcObject/streamType/shardId/12345"
+ "ShardIterator":
"arn:aws:dynamodb:us-west-2:000000000000:table/myTable/stream/2024-01-15T10:30:00.000|1|eyJzdHJlYW1UeXBlIjoiTkVXX0lNQUdFIiwicGFydGl0aW9uSWQiOiIxYTJiM2M0ZDVlNmY3ODkwYWJjZGVmMDEyMzQ1Njc4OSIsInNlcU51bSI6IjAwMDE3MDAwMDAwMDAwMDAwMDAwMDAwIn0"
}
```
-The shard iterator is an encoded string containing: table name, CDC object
name, stream type, shard ID, and starting sequence number.
+The shard iterator follows the DynamoDB Streams wire format
+`<streamArn>|<version>|<base64(JSON state)>`:
+- `<streamArn>` carries the table identity (table name + creation-time stream
label).
+- `<version>` is the literal `"1"`; reserved for future inner-format evolution.
+- `<base64(JSON state)>` is a base64-encoded JSON object carrying the
per-iterator
+ resume state:
`{"streamType":"...","partitionId":"<32-hex>","seqNum":"<21-digit>"}`.
+
+Treat the value as opaque; clients pass it unchanged to subsequent
`GetRecords` calls.
---
@@ -1525,7 +1540,7 @@ Reads change records from a shard using a shard iterator.
"eventName": "INSERT",
"dynamodb": {
"StreamViewType": "NEW_AND_OLD_IMAGES",
- "SequenceNumber": "170000000000001",
+ "SequenceNumber": "000170000000012300000",
"ApproximateCreationDateTime": 1700000000.123,
"Keys": {
"id": {"S": "user-123"}
@@ -1542,7 +1557,7 @@ Reads change records from a shard using a shard iterator.
"eventName": "MODIFY",
"dynamodb": {
"StreamViewType": "NEW_AND_OLD_IMAGES",
- "SequenceNumber": "170000000000002",
+ "SequenceNumber": "000170000000145600000",
"ApproximateCreationDateTime": 1700000001.456,
"Keys": {"id": {"S": "user-123"}},
"OldImage": {"id": {"S": "user-123"}, "name": {"S": "John Doe"},
"age": {"N": "30"}},
@@ -1554,7 +1569,7 @@ Reads change records from a shard using a shard iterator.
"eventName": "REMOVE",
"dynamodb": {
"StreamViewType": "NEW_AND_OLD_IMAGES",
- "SequenceNumber": "170000000000003",
+ "SequenceNumber": "000170000000278900000",
"ApproximateCreationDateTime": 1700000002.789,
"Keys": {"id": {"S": "user-456"}},
"OldImage": {"id": {"S": "user-456"}, "name": {"S": "Jane"}},
@@ -1566,7 +1581,7 @@ Reads change records from a shard using a shard iterator.
}
}
],
- "NextShardIterator":
"shardIterator/tableName/cdcObject/streamType/shardId/170000000000004"
+ "NextShardIterator":
"arn:aws:dynamodb:us-west-2:000000000000:table/myTable/stream/2024-01-15T10:30:00.000|1|eyJzdHJlYW1UeXBlIjoiTkVXX0lNQUdFIiwicGFydGl0aW9uSWQiOiIxYTJiM2M0ZDVlNmY3ODkwYWJjZGVmMDEyMzQ1Njc4OSIsInNlcU51bSI6IjAwMDE3MDAwMDAwMDAwMDAwMDA0In0"
}
```
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
index 9d5c139..f55acc3 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/DescribeStreamService.java
@@ -18,8 +18,11 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.MAX_NUM_CHANGES_AT_TIMESTAMP;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
@@ -33,14 +36,23 @@ public class DescribeStreamService {
= "SELECT PARTITION_ID, PARENT_PARTITION_ID, PARTITION_START_TIME,
PARTITION_END_TIME FROM "
+ SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = '%s' AND
STREAM_NAME = '%s' ";
- public static Map<String, Object> describeStream(Map<String, Object>
request, String connectionUrl) {
- String streamName = (String) request.get(ApiMetadata.STREAM_ARN);
+ private static final String PARENT_PARTITION_START_TIMES_QUERY
+ = "SELECT PARTITION_ID, PARTITION_START_TIME FROM "
+ + SYSTEM_CDC_STREAM_NAME
+ + " WHERE TABLE_NAME = '%s' AND STREAM_NAME = '%s' AND
PARTITION_ID IN (%s)";
+
+ public static Map<String, Object> describeStream(Map<String, Object>
request,
+ String connectionUrl) {
+ String streamArnInput = (String) request.get(ApiMetadata.STREAM_ARN);
+ String streamName =
DdbAdapterCdcUtils.normalizeStreamName(streamArnInput);
+ String streamArn = DdbAdapterCdcUtils.isStreamArn(streamArnInput) ?
+ streamArnInput : DdbAdapterCdcUtils.toStreamArn(streamName);
String exclusiveStartShardId = (String)
request.get(ApiMetadata.EXCLUSIVE_START_SHARD_ID);
Integer limit = (Integer) request.getOrDefault(ApiMetadata.LIMIT,
MAX_LIMIT);
String tableName =
DdbAdapterCdcUtils.getTableNameFromStreamName(streamName);
Map<String, Object> streamDesc;
try (Connection conn = ConnectionUtil.getConnection(connectionUrl)) {
- streamDesc = getStreamDescriptionObject(conn, tableName,
streamName);
+ streamDesc = getStreamDescriptionObject(conn, tableName,
streamName, streamArn);
String streamStatus = DdbAdapterCdcUtils.getStreamStatus(conn,
tableName, streamName);
streamDesc.put(ApiMetadata.STREAM_STATUS, streamStatus);
// query partitions only if stream is ENABLED
@@ -48,19 +60,55 @@ public class DescribeStreamService {
StringBuilder sb = new
StringBuilder(String.format(DESCRIBE_STREAM_QUERY, tableName, streamName));
if (!StringUtils.isEmpty(exclusiveStartShardId)) {
sb.append(" AND PARTITION_ID > '");
- sb.append(exclusiveStartShardId);
+
sb.append(DdbAdapterCdcUtils.partitionIdFromShardId(exclusiveStartShardId));
sb.append("'");
}
sb.append(" LIMIT ");
sb.append(limit);
LOGGER.debug("Describe Stream Query: {}", sb);
- List<Map<String, Object>> shards = new ArrayList<>();
- String lastEvaluatedShardId = null;
+
+ List<Map<String, Object>> rawShards = new ArrayList<>();
+ Map<String, Long> partitionStartTimes = new HashMap<>();
+ Set<String> parentIdsNeeded = new HashSet<>();
ResultSet rs =
conn.createStatement().executeQuery(sb.toString());
int count = 0;
while (rs.next()) {
count++;
- Map<String, Object> shard = getShardMetadata(rs);
+ Map<String, Object> raw = new HashMap<>();
+ String partitionId = rs.getString(1);
+ String parentPartitionId = rs.getString(2);
+ long partitionStartTime = rs.getLong(3);
+ long partitionEndTime = rs.getLong(4);
+ raw.put("partitionId", partitionId);
+ raw.put("parentPartitionId", parentPartitionId);
+ raw.put("partitionStartTime", partitionStartTime);
+ raw.put("partitionEndTime", partitionEndTime);
+ rawShards.add(raw);
+ partitionStartTimes.put(partitionId, partitionStartTime);
+ if (parentPartitionId != null) {
+ parentIdsNeeded.add(parentPartitionId);
+ }
+ }
+ // Parents already in the current page don't need a second
query.
+ parentIdsNeeded.removeAll(partitionStartTimes.keySet());
+
+ if (!parentIdsNeeded.isEmpty()) {
+ String inClause = parentIdsNeeded.stream()
+ .map(id -> "'" + id + "'")
+ .collect(Collectors.joining(","));
+ String parentQuery =
String.format(PARENT_PARTITION_START_TIMES_QUERY,
+ tableName, streamName, inClause);
+ LOGGER.debug("Parent Partition Start Times Query: {}",
parentQuery);
+ ResultSet prs =
conn.createStatement().executeQuery(parentQuery);
+ while (prs.next()) {
+ partitionStartTimes.put(prs.getString(1),
prs.getLong(2));
+ }
+ }
+
+ List<Map<String, Object>> shards = new ArrayList<>();
+ String lastEvaluatedShardId = null;
+ for (Map<String, Object> raw : rawShards) {
+ Map<String, Object> shard = buildShardMetadata(raw,
partitionStartTimes);
shards.add(shard);
lastEvaluatedShardId = (String)
shard.get(ApiMetadata.SHARD_ID);
}
@@ -81,14 +129,12 @@ public class DescribeStreamService {
* Return a StreamDescription object for the given tableName and
streamName.
* Populate all attributes except the list of the shards.
*/
- private static Map<String, Object> getStreamDescriptionObject(Connection
conn,
- String
tableName,
- String
streamName)
- throws SQLException {
+ private static Map<String, Object> getStreamDescriptionObject(Connection
conn, String tableName,
+ String streamName, String streamArn) throws SQLException {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable table = pconn.getTable(tableName);
Map<String, Object> streamDesc = new HashMap<>();
- streamDesc.put(ApiMetadata.STREAM_ARN, streamName);
+ streamDesc.put(ApiMetadata.STREAM_ARN, streamArn);
streamDesc.put(ApiMetadata.TABLE_NAME,
PhoenixUtils.getTableNameFromFullName(tableName, false));
long creationTS =
DdbAdapterCdcUtils.getCDCIndexTimestampFromStreamName(streamName);
streamDesc.put(ApiMetadata.STREAM_LABEL,
DdbAdapterCdcUtils.getStreamLabel(streamName));
@@ -100,23 +146,38 @@ public class DescribeStreamService {
}
/**
- * Build a Shard object using a ResultSet cursor from a query on
SYSTEM.CDC_STREAM.
+ * Build a Shard response object from a buffered partition row and a
precomputed map
+ * of {@code partitionId -> partitionStartTime} (including any parents
pulled from the
+ * batch parent lookup).
*/
- private static Map<String, Object> getShardMetadata(ResultSet rs) throws
SQLException {
- // rs --> id, parentId, startTime, endTime
+ private static Map<String, Object> buildShardMetadata(Map<String, Object>
raw,
+ Map<String, Long> partitionStartTimes) {
+ String partitionId = (String) raw.get("partitionId");
+ String parentPartitionId = (String) raw.get("parentPartitionId");
+ long partitionStartTime = (Long) raw.get("partitionStartTime");
+ long partitionEndTime = (Long) raw.get("partitionEndTime");
+
Map<String, Object> shard = new HashMap<>();
- // shard id
- shard.put(ApiMetadata.SHARD_ID, rs.getString(1));
- // parent shard id
- if (rs.getString(2) != null) {
- shard.put(ApiMetadata.PARENT_SHARD_ID, rs.getString(2));
+ shard.put(ApiMetadata.SHARD_ID,
+ DdbAdapterCdcUtils.toShardId(partitionStartTime, partitionId));
+ if (parentPartitionId != null) {
+ Long parentStartTime = partitionStartTimes.get(parentPartitionId);
+ if (parentStartTime != null) {
+ shard.put(ApiMetadata.PARENT_SHARD_ID,
+ DdbAdapterCdcUtils.toShardId(parentStartTime,
parentPartitionId));
+ } else {
+ LOGGER.info("Parent partition {} for partition {} is no longer
present in "
+ + "SYSTEM.CDC_STREAM (likely TTLed); omitting
ParentShardId "
+ + "from the response.", parentPartitionId, partitionId);
+ }
}
- // start sequence number
Map<String, Object> seqNumRange = new HashMap<>();
- seqNumRange.put(ApiMetadata.STARTING_SEQUENCE_NUMBER,
String.valueOf(rs.getLong(3) * MAX_NUM_CHANGES_AT_TIMESTAMP));
- // end sequence number
- if (rs.getLong(4) > 0) {
- seqNumRange.put(ApiMetadata.ENDING_SEQUENCE_NUMBER,
String.valueOf(((rs.getLong(4)+1) * MAX_NUM_CHANGES_AT_TIMESTAMP) - 1));
+ seqNumRange.put(ApiMetadata.STARTING_SEQUENCE_NUMBER,
+ DdbAdapterCdcUtils.getSequenceNumber(partitionStartTime, 0));
+ if (partitionEndTime > 0) {
+ seqNumRange.put(ApiMetadata.ENDING_SEQUENCE_NUMBER,
+ DdbAdapterCdcUtils.getSequenceNumber(partitionEndTime,
+ MAX_NUM_CHANGES_AT_TIMESTAMP - 1));
}
shard.put(ApiMetadata.SEQUENCE_NUMBER_RANGE, seqNumRange);
return shard;
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
index f41ebb1..919cbe9 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.phoenix.ddb.ConnectionUtil;
import org.apache.phoenix.ddb.service.exceptions.PhoenixServiceException;
+import org.apache.phoenix.ddb.service.exceptions.ValidationException;
import org.apache.phoenix.ddb.utils.ApiMetadata;
import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
@@ -57,8 +58,7 @@ public class GetRecordsService {
* return null for nextShardIterator if there are more records to return.
*/
public static Map<String, Object> getRecords(Map<String, Object> request,
String connectionUrl) {
- PhoenixShardIterator pIter
- = new PhoenixShardIterator((String)
request.get(ApiMetadata.SHARD_ITERATOR));
+ PhoenixShardIterator pIter = parseShardIterator(request);
Integer requestLimit = (Integer) request.get(ApiMetadata.LIMIT);
List<Map<String, Object>> records = new ArrayList<>();
long lastTs = pIter.getTimestamp();
@@ -218,4 +218,15 @@ public class GetRecordsService {
record.put(ApiMetadata.AWS_REGION, ApiMetadata.AWS_REGION_VALUE);
return record;
}
+
+ private static PhoenixShardIterator parseShardIterator(Map<String, Object>
request) {
+ String shardIterator = (String)
request.get(ApiMetadata.SHARD_ITERATOR);
+ try {
+ return new PhoenixShardIterator(shardIterator);
+ } catch (RuntimeException e) {
+ throw new ValidationException(
+ "Invalid ShardIterator: " + shardIterator + ", error message:
" + (
+ e.getMessage() == null ? "" : e.getMessage()));
+ }
+ }
}
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
index 423a0cf..19992e1 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetShardIteratorService.java
@@ -5,6 +5,7 @@ import
org.apache.phoenix.ddb.service.exceptions.PhoenixServiceException;
import org.apache.phoenix.ddb.service.exceptions.ValidationException;
import org.apache.phoenix.ddb.utils.ApiMetadata;
import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
+import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import java.sql.Connection;
@@ -12,26 +13,28 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
-import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.MAX_NUM_CHANGES_AT_TIMESTAMP;
-import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_FORMAT;
-
public class GetShardIteratorService {
public static Map<String, Object> getShardIterator(Map<String, Object>
request,
String connectionUrl) {
Map<String, Object> result = new HashMap<>();
try (Connection conn = ConnectionUtil.getConnection(connectionUrl)) {
- String streamArn = (String) request.get(ApiMetadata.STREAM_ARN);
- String shardId = (String) request.get(ApiMetadata.SHARD_ID);
+ String streamArnInput = (String)
request.get(ApiMetadata.STREAM_ARN);
+ String streamName =
DdbAdapterCdcUtils.normalizeStreamName(streamArnInput);
+ String partitionId = DdbAdapterCdcUtils.partitionIdFromShardId(
+ (String) request.get(ApiMetadata.SHARD_ID));
String seqNum = (String) request.get(ApiMetadata.SEQUENCE_NUMBER);
String shardIterType = (String)
request.get(ApiMetadata.SHARD_ITERATOR_TYPE);
- String tableName =
DdbAdapterCdcUtils.getTableNameFromStreamName(streamArn);
- String cdcObj =
DdbAdapterCdcUtils.getCDCObjectNameFromStreamName(streamArn);
- String startSeqNum = getStartingSequenceNumber(conn, tableName,
streamArn, shardId,
- seqNum, shardIterType);
+ String tableName =
DdbAdapterCdcUtils.getTableNameFromStreamName(streamName);
+ String startSeqNum =
+ getStartingSequenceNumber(conn, tableName, streamName,
partitionId, seqNum,
+ shardIterType);
String streamType = DdbAdapterCdcUtils.getStreamType(conn,
tableName);
- result.put(ApiMetadata.SHARD_ITERATOR,
String.format(SHARD_ITERATOR_FORMAT, tableName,
- cdcObj, streamType, shardId, startSeqNum));
+ String streamArn = DdbAdapterCdcUtils.isStreamArn(streamArnInput)
+ ? streamArnInput : DdbAdapterCdcUtils.toStreamArn(streamName);
+ PhoenixShardIterator pIter = new PhoenixShardIterator(streamArn,
streamName,
+ streamType, partitionId, startSeqNum);
+ result.put(ApiMetadata.SHARD_ITERATOR, pIter.toString());
} catch (SQLException e) {
throw new PhoenixServiceException(e);
}
@@ -39,27 +42,30 @@ public class GetShardIteratorService {
}
private static String getStartingSequenceNumber(Connection conn, String
tableName,
- String streamName, String
shardId,
+ String streamName, String
partitionId,
String seqNum, String type)
throws SQLException {
- String startSeqNum = null;
+ String startSeqNum;
switch (type) {
case "AT_SEQUENCE_NUMBER" :
- startSeqNum = seqNum;
+ startSeqNum =
+ String.format("%021d",
DdbAdapterCdcUtils.parseSequenceNumber(seqNum));
break;
case "AFTER_SEQUENCE_NUMBER":
- startSeqNum = String.valueOf(Long.parseLong(seqNum) + 1);
+ startSeqNum =
+ String.format("%021d",
DdbAdapterCdcUtils.parseSequenceNumber(seqNum) + 1);
break;
case "LATEST":
// new records only i.e. use current time.
- startSeqNum =
String.valueOf(EnvironmentEdgeManager.currentTimeMillis()
- * MAX_NUM_CHANGES_AT_TIMESTAMP);
+ startSeqNum =
+
DdbAdapterCdcUtils.getSequenceNumber(EnvironmentEdgeManager.currentTimeMillis(),
+ 0);
break;
case "TRIM_HORIZON":
// Oldest available sequence number in the shard, we will use
shard's start sequence number
long partitionStartTime =
DdbAdapterCdcUtils.getPartitionStartTime(
- conn, tableName, streamName, shardId);
- startSeqNum = String.valueOf(partitionStartTime *
MAX_NUM_CHANGES_AT_TIMESTAMP);
+ conn, tableName, streamName, partitionId);
+ startSeqNum =
DdbAdapterCdcUtils.getSequenceNumber(partitionStartTime, 0);
break;
default:
throw new ValidationException("Invalid shard iterator type: "
+ type);
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
index cc81946..5ae0ac2 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/ListStreamsService.java
@@ -44,25 +44,26 @@ public class ListStreamsService {
}
if (!StringUtils.isEmpty(exclusiveStartStreamArn)) {
query.append(" AND STREAM_NAME > '")
- .append(exclusiveStartStreamArn)
+
.append(DdbAdapterCdcUtils.normalizeStreamName(exclusiveStartStreamArn))
.append("'");
}
int limit = (int) request.getOrDefault(ApiMetadata.LIMIT,
MAX_LIMIT);
query.append(" LIMIT ").append(limit);
ResultSet rs =
connection.createStatement().executeQuery(query.toString());
- String lastStreamArn = null;
+ String lastStreamName = null;
while (rs.next()) {
String tableName = rs.getString(1);
String streamName = rs.getString(2);
Map<String, Object> stream = new HashMap<>();
stream.put(ApiMetadata.TABLE_NAME,
PhoenixUtils.getTableNameFromFullName(tableName, false));
- stream.put(ApiMetadata.STREAM_ARN, streamName);
+ stream.put(ApiMetadata.STREAM_ARN,
DdbAdapterCdcUtils.toStreamArn(streamName));
stream.put(ApiMetadata.STREAM_LABEL,
DdbAdapterCdcUtils.getStreamLabel(streamName));
streams.add(stream);
- lastStreamArn = streamName;
+ lastStreamName = streamName;
}
result.put(ApiMetadata.STREAMS, streams);
- result.put(ApiMetadata.LAST_EVALUATED_STREAM_ARN, lastStreamArn);
+ result.put(ApiMetadata.LAST_EVALUATED_STREAM_ARN,
+ lastStreamName == null ? null :
DdbAdapterCdcUtils.toStreamArn(lastStreamName));
} catch (SQLException e) {
throw new PhoenixServiceException(e);
}
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
index 0bd363d..d02b05c 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/TableDescriptorUtils.java
@@ -217,7 +217,8 @@ public class TableDescriptorUtils {
String streamName = DdbAdapterCdcUtils.getEnabledStreamName(pconn,
table.getName().getString());
if (streamName != null && table.getSchemaVersion() != null) {
- tableDescription.put(ApiMetadata.LATEST_STREAM_ARN, streamName);
+ tableDescription.put(ApiMetadata.LATEST_STREAM_ARN,
+ DdbAdapterCdcUtils.toStreamArn(streamName));
tableDescription.put(ApiMetadata.LATEST_STREAM_LABEL,
DdbAdapterCdcUtils.getStreamLabel(streamName));
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java
index 6824a1f..1c68237 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DDLTestUtils.java
@@ -267,7 +267,9 @@ public class DDLTestUtils {
String streamType)
throws SQLException, ParseException {
String tableName = td.tableName();
-
Assert.assertTrue(td.latestStreamArn().startsWith("phoenix/cdc/stream/"));
+ Assert.assertTrue(td.latestStreamArn().startsWith(
+ "arn:aws:dynamodb:us-west-2:000000000000:table/"));
+ Assert.assertTrue(td.latestStreamArn().contains("/stream/"));
Assert.assertTrue(td.latestStreamArn().contains(tableName));
PTable dataTable =
pconn.getTable(PhoenixUtils.getFullTableName(tableName, false));
@@ -285,7 +287,7 @@ public class DDLTestUtils {
df.setTimeZone(TimeZone.getTimeZone("UTC"));
Date date = df.parse(td.latestStreamLabel());
Assert.assertEquals(String.valueOf(cdcIndex.getTimeStamp()),
String.valueOf(date.getTime()));
-
Assert.assertTrue(td.latestStreamArn().contains(String.valueOf(cdcIndex.getTimeStamp())));
+ Assert.assertTrue(td.latestStreamArn().endsWith("/stream/" +
td.latestStreamLabel()));
}
private static CreateTableRequest.Builder
addGlobalIndexToRequest(CreateTableRequest.Builder request, String indexName,
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java
new file mode 100644
index 0000000..3a31da6
--- /dev/null
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/DdbAdapterCdcUtilsTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
+import java.util.Base64;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
+import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DdbAdapterCdcUtilsTest {
+
+ private static final long CDC_INDEX_TS = 1700000000000L;
+ private static final String TABLE_NAME = "MY_TABLE";
+ private static final String INTERNAL_TABLE = "DDB." + TABLE_NAME;
+
+ private static String formatUtc(long ts) {
+ SimpleDateFormat df = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ df.setTimeZone(TimeZone.getTimeZone("Etc/UTC"));
+ return df.format(new Date(ts));
+ }
+
+ private static String internalStreamName(long ts) {
+ return "phoenix/cdc/stream/" + INTERNAL_TABLE + "/CDC_" + TABLE_NAME +
"/" + ts + "/"
+ + formatUtc(ts);
+ }
+
+ @Test
+ public void testToStreamArn_emitsAwsShape() {
+ String arn =
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+ Assert.assertEquals(
+ "arn:aws:dynamodb:us-west-2:000000000000:table/" + TABLE_NAME +
"/stream/" + formatUtc(
+ CDC_INDEX_TS), arn);
+ }
+
+ @Test
+ public void testToStreamArn_stripsInternalDdbPrefix() {
+ String arn =
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+ Assert.assertFalse(arn.contains("DDB." + TABLE_NAME));
+ Assert.assertTrue(arn.contains(":table/" + TABLE_NAME + "/stream/"));
+ }
+
+ @Test
+ public void testFromStreamArn_addsInternalDdbPrefix() {
+ String arn =
+ "arn:aws:dynamodb:us-west-2:000000000000:table/" + TABLE_NAME +
"/stream/" + formatUtc(
+ CDC_INDEX_TS);
+ Assert.assertEquals(internalStreamName(CDC_INDEX_TS),
+ DdbAdapterCdcUtils.fromStreamArn(arn));
+ }
+
+ @Test
+ public void testFromStreamArn_idempotentOnLegacyArnWithDdbPrefix() {
+ String legacyArn =
+ "arn:aws:dynamodb:us-west-2:000000000000:table/" + INTERNAL_TABLE
+ "/stream/"
+ + formatUtc(CDC_INDEX_TS);
+ Assert.assertEquals(internalStreamName(CDC_INDEX_TS),
+ DdbAdapterCdcUtils.fromStreamArn(legacyArn));
+ }
+
+ @Test
+ public void testNormalizeThenToStreamArn_canonicalizesDdbPrefixedArn() {
+ String prefixedArn =
+ "arn:aws:dynamodb:us-west-2:000000000000:table/" + INTERNAL_TABLE
+ "/stream/"
+ + formatUtc(CDC_INDEX_TS);
+ String canonicalArn =
+ "arn:aws:dynamodb:us-west-2:000000000000:table/" + TABLE_NAME +
"/stream/" + formatUtc(
+ CDC_INDEX_TS);
+ String normalized =
DdbAdapterCdcUtils.normalizeStreamName(prefixedArn);
+ Assert.assertEquals(canonicalArn,
DdbAdapterCdcUtils.toStreamArn(normalized));
+ }
+
+ @Test
+ public void testFromStreamArn_reconstructsInternalName() {
+ String internal = internalStreamName(CDC_INDEX_TS);
+ String arn = DdbAdapterCdcUtils.toStreamArn(internal);
+ Assert.assertEquals(internal, DdbAdapterCdcUtils.fromStreamArn(arn));
+ }
+
+ @Test
+ public void testRoundTrip_byteIdentical() {
+ for (long ts : new long[] {0L, 1L, 1000L, 1700000000000L,
1777440690689L, 4102444800000L}) {
+ String internal = internalStreamName(ts);
+ Assert.assertEquals("ts=" + ts, internal,
+
DdbAdapterCdcUtils.fromStreamArn(DdbAdapterCdcUtils.toStreamArn(internal)));
+ }
+ }
+
+ @Test
+ public void testIsStreamArn() {
+ Assert.assertTrue(DdbAdapterCdcUtils.isStreamArn(
+
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2024-01-15T10:30:00.000"));
+
Assert.assertFalse(DdbAdapterCdcUtils.isStreamArn(internalStreamName(CDC_INDEX_TS)));
+ Assert.assertFalse(DdbAdapterCdcUtils.isStreamArn(null));
+ Assert.assertFalse(DdbAdapterCdcUtils.isStreamArn(""));
+ }
+
+ @Test
+ public void testNormalizeStreamName_acceptsBothFormats() {
+ String internal = internalStreamName(CDC_INDEX_TS);
+ String arn = DdbAdapterCdcUtils.toStreamArn(internal);
+ Assert.assertEquals(internal,
DdbAdapterCdcUtils.normalizeStreamName(internal));
+ Assert.assertEquals(internal,
DdbAdapterCdcUtils.normalizeStreamName(arn));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromStreamArn_rejectsNonArn() {
+ DdbAdapterCdcUtils.fromStreamArn(internalStreamName(CDC_INDEX_TS));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromStreamArn_rejectsMissingStreamSegment() {
+ DdbAdapterCdcUtils.fromStreamArn(
+
"arn:aws:dynamodb:us-west-2:000000000000:table/T-without-stream-segment");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromStreamArn_rejectsMalformedLabel() {
+ DdbAdapterCdcUtils.fromStreamArn(
+
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/not-an-iso-timestamp");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromStreamArn_rejectsImpossibleDayInMonth() {
+ DdbAdapterCdcUtils.fromStreamArn(
+
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2024-02-30T00:00:00.000");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromStreamArn_rejectsFeb29InNonLeapYear() {
+ DdbAdapterCdcUtils.fromStreamArn(
+
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2023-02-29T00:00:00.000");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromStreamArn_rejectsImpossibleHour() {
+ DdbAdapterCdcUtils.fromStreamArn(
+
"arn:aws:dynamodb:us-west-2:000000000000:table/T/stream/2024-01-15T25:00:00.000");
+ }
+
+ @Test
+ public void testToStreamArn_preservesTableName() {
+ String arn =
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+ Assert.assertTrue(arn.contains(":table/" + TABLE_NAME + "/stream/"));
+ }
+
+ @Test
+ public void testGetStreamLabel_unchangedAfterRoundTrip() {
+ String internal = internalStreamName(CDC_INDEX_TS);
+ String arn = DdbAdapterCdcUtils.toStreamArn(internal);
+ Assert.assertEquals(DdbAdapterCdcUtils.getStreamLabel(internal),
+
DdbAdapterCdcUtils.getStreamLabel(DdbAdapterCdcUtils.fromStreamArn(arn)));
+ }
+
+ private static final String PARTITION_HEX =
"1a2b3c4d5e6f7890abcdef0123456789";
+ private static final long PARTITION_START_MS = 1700000000000L;
+
+ @Test
+ public void testToShardId_emitsAwsShape() {
+ Assert.assertEquals("shardId-" + PARTITION_START_MS + "-" +
PARTITION_HEX,
+ DdbAdapterCdcUtils.toShardId(PARTITION_START_MS, PARTITION_HEX));
+ }
+
+ @Test
+ public void testToShardId_lengthWithinAwsSpec() {
+ String shardId = DdbAdapterCdcUtils.toShardId(PARTITION_START_MS,
PARTITION_HEX);
+ Assert.assertTrue("len=" + shardId.length(),
+ shardId.length() >= 28 && shardId.length() <= 65);
+ }
+
+ @Test
+ public void testIsShardId_detectsBothFormats() {
+ Assert.assertTrue(
+ DdbAdapterCdcUtils.isShardId("shardId-" + PARTITION_START_MS + "-"
+ PARTITION_HEX));
+ Assert.assertFalse(DdbAdapterCdcUtils.isShardId(PARTITION_HEX));
+ Assert.assertFalse(DdbAdapterCdcUtils.isShardId(null));
+ Assert.assertFalse(DdbAdapterCdcUtils.isShardId(""));
+ }
+
+ @Test
+ public void testPartitionIdFromShardId_extractsBareHexFromNewShape() {
+ String shardId = DdbAdapterCdcUtils.toShardId(PARTITION_START_MS,
PARTITION_HEX);
+ Assert.assertEquals(PARTITION_HEX,
DdbAdapterCdcUtils.partitionIdFromShardId(shardId));
+ }
+
+ @Test
+ public void testPartitionIdFromShardId_passesThroughLegacyHex() {
+ Assert.assertEquals(PARTITION_HEX,
+ DdbAdapterCdcUtils.partitionIdFromShardId(PARTITION_HEX));
+ }
+
+ @Test
+ public void testPartitionIdFromShardId_idempotent() {
+ String shardId = DdbAdapterCdcUtils.toShardId(PARTITION_START_MS,
PARTITION_HEX);
+ String once = DdbAdapterCdcUtils.partitionIdFromShardId(shardId);
+ Assert.assertEquals(once,
DdbAdapterCdcUtils.partitionIdFromShardId(once));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPartitionIdFromShardId_rejectsNull() {
+ DdbAdapterCdcUtils.partitionIdFromShardId(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPartitionIdFromShardId_rejectsEmpty() {
+ DdbAdapterCdcUtils.partitionIdFromShardId("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPartitionIdFromShardId_rejectsMissingHexSegment() {
+ DdbAdapterCdcUtils.partitionIdFromShardId("shardId-" +
PARTITION_START_MS);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPartitionIdFromShardId_rejectsTrailingDashWithoutHex() {
+ DdbAdapterCdcUtils.partitionIdFromShardId("shardId-" +
PARTITION_START_MS + "-");
+ }
+
+ private static final long SEQ_TS = 1738372050000L;
+ private static final int SEQ_OFFSET = 420;
+ private static final long SEQ_NUMERIC = 173837205000000420L;
+ private static final String SEQ_18 = "173837205000000420";
+ private static final String SEQ_21 = "000173837205000000420";
+
+ @Test
+ public void testGetSequenceNumber_emits21DigitZeroPadded() {
+ String seq = DdbAdapterCdcUtils.getSequenceNumber(SEQ_TS, SEQ_OFFSET);
+ Assert.assertEquals("len=" + seq.length(), 21, seq.length());
+ Assert.assertEquals(SEQ_21, seq);
+ }
+
+ @Test
+ public void testGetSequenceNumber_numericValueUnchangedFromLegacyConcat() {
+ String legacyConcat =
+ SEQ_TS + String.format("%0" + DdbAdapterCdcUtils.OFFSET_LENGTH +
"d", SEQ_OFFSET);
+ String paddedNew = DdbAdapterCdcUtils.getSequenceNumber(SEQ_TS,
SEQ_OFFSET);
+ Assert.assertEquals(Long.parseLong(legacyConcat),
Long.parseLong(paddedNew));
+ }
+
+ @Test
+ public void testParseSequenceNumber_acceptsBothFormatsIdentically() {
+ Assert.assertEquals(SEQ_NUMERIC,
DdbAdapterCdcUtils.parseSequenceNumber(SEQ_18));
+ Assert.assertEquals(SEQ_NUMERIC,
DdbAdapterCdcUtils.parseSequenceNumber(SEQ_21));
+ Assert.assertEquals(SEQ_NUMERIC,
DdbAdapterCdcUtils.parseSequenceNumber(
+ DdbAdapterCdcUtils.getSequenceNumber(SEQ_TS, SEQ_OFFSET)));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParseSequenceNumber_rejectsNull() {
+ DdbAdapterCdcUtils.parseSequenceNumber(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParseSequenceNumber_rejectsEmpty() {
+ DdbAdapterCdcUtils.parseSequenceNumber("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParseSequenceNumber_rejectsNonNumeric() {
+ DdbAdapterCdcUtils.parseSequenceNumber("abc");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParseSequenceNumber_rejectsNegative() {
+ DdbAdapterCdcUtils.parseSequenceNumber("-1");
+ }
+
+ @Test
+ public void testGetEventId_stableAcrossSequenceNumberFormatChange() {
+ String idLegacy = DdbAdapterCdcUtils.getEventId(TABLE_NAME,
PARTITION_HEX, SEQ_18);
+ String idNew = DdbAdapterCdcUtils.getEventId(TABLE_NAME,
PARTITION_HEX, SEQ_21);
+ Assert.assertEquals(idLegacy, idNew);
+ Assert.assertEquals(32, idLegacy.length());
+ Assert.assertTrue(idLegacy.matches("[0-9a-f]{32}"));
+ }
+
+ @Test
+ public void testGetEventId_matchesPreWiCRawHashOnLegacySeqNum() {
+ String expected;
+ try {
+ String input = TABLE_NAME + "|" + PARTITION_HEX + "|" + SEQ_18;
+ java.security.MessageDigest md =
java.security.MessageDigest.getInstance("MD5");
+ byte[] digest =
md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ expected = String.format("%032x", new java.math.BigInteger(1,
digest));
+ } catch (java.security.NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ Assert.assertEquals(expected,
+ DdbAdapterCdcUtils.getEventId(TABLE_NAME, PARTITION_HEX, SEQ_18));
+ Assert.assertEquals(expected,
+ DdbAdapterCdcUtils.getEventId(TABLE_NAME, PARTITION_HEX, SEQ_21));
+ }
+
+ private static final String SHARD_ITER_STREAM_TYPE = "NEW_AND_OLD_IMAGES";
+
+ private static String canonicalStreamArn() {
+ return
DdbAdapterCdcUtils.toStreamArn(internalStreamName(CDC_INDEX_TS));
+ }
+
+ private static PhoenixShardIterator newIterator() {
+ return new PhoenixShardIterator(canonicalStreamArn(),
internalStreamName(CDC_INDEX_TS),
+ SHARD_ITER_STREAM_TYPE, PARTITION_HEX, SEQ_21);
+ }
+
+ @Test
+ public void testShardIterator_toString_emitsAwsShape() {
+ String token = newIterator().toString();
+ Assert.assertTrue("must start with canonical streamArn: " + token,
+ token.startsWith(canonicalStreamArn() + "|"));
+ String[] parts = token.split("\\|", -1);
+ Assert.assertEquals("expected 3 outer parts: " + token, 3,
parts.length);
+ Assert.assertEquals("version sentinel must be \"1\": " + token, "1",
parts[1]);
+ String json = new String(Base64.getDecoder().decode(parts[2]),
StandardCharsets.UTF_8);
+ Assert.assertTrue(json.contains("\"streamType\":\"" +
SHARD_ITER_STREAM_TYPE + "\""));
+ Assert.assertTrue(json.contains("\"partitionId\":\"" + PARTITION_HEX +
"\""));
+ Assert.assertTrue(json.contains("\"seqNum\":\"" + SEQ_21 + "\""));
+ }
+
+ @Test
+ public void testShardIterator_roundTrip_recoversAllFields() {
+ PhoenixShardIterator original = newIterator();
+ PhoenixShardIterator parsed = new
PhoenixShardIterator(original.toString());
+ Assert.assertEquals(canonicalStreamArn(), parsed.getStreamArn());
+ Assert.assertEquals(SHARD_ITER_STREAM_TYPE, parsed.getStreamType());
+ Assert.assertEquals(PARTITION_HEX, parsed.getPartitionId());
+ Assert.assertEquals(SEQ_21, parsed.getSeqNum());
+ Assert.assertEquals(original.getTimestamp(), parsed.getTimestamp());
+ Assert.assertEquals(original.getOffset(), parsed.getOffset());
+ }
+
+ @Test
+ public void
testShardIterator_roundTrip_derivesTableAndCdcObjectFromStreamArn() {
+ PhoenixShardIterator parsed = new
PhoenixShardIterator(newIterator().toString());
+ Assert.assertEquals(INTERNAL_TABLE, parsed.getTableName());
+ Assert.assertEquals("CDC_" + TABLE_NAME, parsed.getCdcObject());
+ }
+
+ @Test
+ public void testShardIterator_roundTrip_byteIdenticalToString() {
+ String first = newIterator().toString();
+ String second = new PhoenixShardIterator(first).toString();
+ Assert.assertEquals(first, second);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsNull() {
+ new PhoenixShardIterator((String) null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsEmpty() {
+ new PhoenixShardIterator("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsMalformedOuter() {
+ new PhoenixShardIterator(canonicalStreamArn() + "|1");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsUnknownVersion() {
+ String validInnerBase64 = newIterator().toString().split("\\|", -1)[2];
+ new PhoenixShardIterator(canonicalStreamArn() + "|9|" +
validInnerBase64);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsCorruptBase64() {
+ new PhoenixShardIterator(canonicalStreamArn() + "|1|!!!not-base64!!!");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsMalformedJson() {
+ String notJson = Base64.getEncoder().withoutPadding()
+ .encodeToString("not-json".getBytes(StandardCharsets.UTF_8));
+ new PhoenixShardIterator(canonicalStreamArn() + "|1|" + notJson);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsMissingStateField() {
+ String partialState = Base64.getEncoder().withoutPadding()
+
.encodeToString("{\"streamType\":\"NEW_IMAGE\"}".getBytes(StandardCharsets.UTF_8));
+ new PhoenixShardIterator(canonicalStreamArn() + "|1|" + partialState);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testShardIterator_parse_rejectsLegacySlashFormat() {
+ new PhoenixShardIterator(
+ "shardIterator/" + INTERNAL_TABLE + "/CDC_" + TABLE_NAME +
"/NEW_IMAGE/" + PARTITION_HEX
+ + "/" + SEQ_21);
+ }
+
+ @Test
+ public void
testShardIterator_setNewSeqNum_preservesAllOtherFieldsThroughRoundTrip() {
+ PhoenixShardIterator pIter = newIterator();
+ long newTs = SEQ_TS + 1000;
+ int newOffset = 5;
+ pIter.setNewSeqNum(newTs, newOffset);
+ Assert.assertEquals(DdbAdapterCdcUtils.getSequenceNumber(newTs,
newOffset),
+ pIter.getSeqNum());
+ PhoenixShardIterator reparsed = new
PhoenixShardIterator(pIter.toString());
+ Assert.assertEquals(canonicalStreamArn(), reparsed.getStreamArn());
+ Assert.assertEquals(SHARD_ITER_STREAM_TYPE, reparsed.getStreamType());
+ Assert.assertEquals(PARTITION_HEX, reparsed.getPartitionId());
+ Assert.assertEquals(INTERNAL_TABLE, reparsed.getTableName());
+ Assert.assertEquals("CDC_" + TABLE_NAME, reparsed.getCdcObject());
+ Assert.assertEquals(DdbAdapterCdcUtils.getSequenceNumber(newTs,
newOffset),
+ reparsed.getSeqNum());
+ Assert.assertEquals(newTs, reparsed.getTimestamp());
+ Assert.assertEquals(newOffset, reparsed.getOffset());
+ }
+
+ @Test
+ public void testShardIterator_lengthWithinAwsSpec_realisticInputs() {
+ String token = newIterator().toString();
+ Assert.assertTrue("token must fit AWS [1, 2048] spec, was len=" +
token.length(),
+ token.length() >= 1 && token.length() <= 2048);
+ }
+
+ @Test
+ public void testShardIterator_lengthWithinAwsSpec_longTableName() {
+ StringBuilder longBare = new StringBuilder("T");
+ while (longBare.length() < 200) {
+ longBare.append("X");
+ }
+ String longBareStr = longBare.toString();
+ String longStreamArn =
+ "arn:aws:dynamodb:us-west-2:000000000000:table/" + longBareStr +
"/stream/" + formatUtc(
+ CDC_INDEX_TS);
+ String longInternalStreamName =
+ "phoenix/cdc/stream/DDB." + longBareStr + "/CDC_" + longBareStr +
"/" + CDC_INDEX_TS
+ + "/" + formatUtc(CDC_INDEX_TS);
+ PhoenixShardIterator big =
+ new PhoenixShardIterator(longStreamArn, longInternalStreamName,
SHARD_ITER_STREAM_TYPE,
+ PARTITION_HEX, SEQ_21);
+ String token = big.toString();
+ Assert.assertTrue("token must fit AWS [1, 2048] spec, was len=" +
token.length(),
+ token.length() <= 2048);
+ PhoenixShardIterator reparsed = new PhoenixShardIterator(token);
+ Assert.assertEquals(longStreamArn, reparsed.getStreamArn());
+ Assert.assertEquals("DDB." + longBare, reparsed.getTableName());
+ }
+}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java
index 962f936..e861f5e 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetShardIteratorIT.java
@@ -50,11 +50,16 @@ import
software.amazon.awssdk.services.dynamodb.model.StreamDescription;
import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+import org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils;
+import org.apache.phoenix.ddb.utils.PhoenixShardIterator;
+
import java.sql.DriverManager;
import java.util.Map;
import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.MAX_NUM_CHANGES_AT_TIMESTAMP;
import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_DELIM;
+import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_NUM_PARTS;
+import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_VERSION;
import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
import static
software.amazon.awssdk.services.dynamodb.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
@@ -173,26 +178,30 @@ public class GetShardIteratorIT {
.shardId(shardId);
String testSeqNum = "173837205000000420"; // 1738372050000+00420
- String testSeqNumPlusOne = String.valueOf(Long.parseLong(testSeqNum) +
1); // 1738372050000+00421
+ long testSeqNumLong = Long.parseLong(testSeqNum);
//AT_SEQUENCE_NUMBER
request.sequenceNumber(testSeqNum);
request.shardIteratorType(AT_SEQUENCE_NUMBER);
GetShardIteratorResponse result =
phoenixDBStreamsClientV2.getShardIterator(request.build());
validateShardIterator(result.shardIterator(), tableName,
"CDC_"+tableName, "OLD_IMAGE", shardId);
- Assert.assertTrue(result.shardIterator().contains(testSeqNum));
+ // The iterator's inner seqNum is the canonical 21-digit form of
testSeqNum.
+ Assert.assertEquals(testSeqNumLong,
+ Long.parseLong(new
PhoenixShardIterator(result.shardIterator()).getSeqNum()));
//AFTER_SEQUENCE_NUMBER
request.shardIteratorType(AFTER_SEQUENCE_NUMBER);
result = phoenixDBStreamsClientV2.getShardIterator(request.build());
validateShardIterator(result.shardIterator(), tableName,
"CDC_"+tableName, "OLD_IMAGE", shardId);
- Assert.assertTrue(result.shardIterator().contains(testSeqNumPlusOne));
+ Assert.assertEquals(testSeqNumLong + 1,
+ Long.parseLong(new
PhoenixShardIterator(result.shardIterator()).getSeqNum()));
//TRIM_HORIZON
request.sequenceNumber(null);
request.shardIteratorType(TRIM_HORIZON);
result = phoenixDBStreamsClientV2.getShardIterator(request.build());
validateShardIterator(result.shardIterator(), tableName,
"CDC_"+tableName, "OLD_IMAGE", shardId);
- Assert.assertTrue(result.shardIterator().contains(shardStartSeqNum));
+ Assert.assertEquals(Long.parseLong(shardStartSeqNum),
+ Long.parseLong(new
PhoenixShardIterator(result.shardIterator()).getSeqNum()));
//LATEST
request.sequenceNumber(null);
@@ -200,18 +209,27 @@ public class GetShardIteratorIT {
request.shardIteratorType(LATEST);
result = phoenixDBStreamsClientV2.getShardIterator(request.build());
validateShardIterator(result.shardIterator(), tableName,
"CDC_"+tableName, "OLD_IMAGE", shardId);
- String[] shardIter =
result.shardIterator().split(SHARD_ITERATOR_DELIM);
// shard iterator would be created after the current time we recorded
here
- Assert.assertTrue(Long.parseLong(shardIter[shardIter.length-1]) >
currentTime * MAX_NUM_CHANGES_AT_TIMESTAMP);
+ long latestSeqNum = Long.parseLong(
+ new PhoenixShardIterator(result.shardIterator()).getSeqNum());
+ Assert.assertTrue(latestSeqNum > currentTime *
MAX_NUM_CHANGES_AT_TIMESTAMP);
}
private void validateShardIterator(String shardIter, String tableName,
String cdcObj,
String streamType, String shardId) {
LOGGER.info("Shard Iterator: " + shardIter);
- Assert.assertTrue(shardIter.contains(tableName));
- Assert.assertTrue(shardIter.contains(cdcObj));
- Assert.assertTrue(shardIter.contains(streamType));
- Assert.assertTrue(shardIter.contains(shardId));
+ Assert.assertTrue("must start with canonical streamArn, was: " +
shardIter,
+
shardIter.startsWith("arn:aws:dynamodb:us-west-2:000000000000:table/"
+ + tableName + "/stream/"));
+ String[] parts =
shardIter.split(java.util.regex.Pattern.quote(SHARD_ITERATOR_DELIM), -1);
+ Assert.assertEquals(SHARD_ITERATOR_NUM_PARTS, parts.length);
+ Assert.assertEquals(SHARD_ITERATOR_VERSION, parts[1]);
+ PhoenixShardIterator parsed = new PhoenixShardIterator(shardIter);
+ Assert.assertEquals("DDB." + tableName, parsed.getTableName());
+ Assert.assertEquals(cdcObj, parsed.getCdcObject());
+ Assert.assertEquals(streamType, parsed.getStreamType());
+ Assert.assertEquals(DdbAdapterCdcUtils.partitionIdFromShardId(shardId),
+ parsed.getPartitionId());
}
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java
index 9eebbf5..847ce78 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ListStreamsIT.java
@@ -44,7 +44,6 @@ import java.sql.DriverManager;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import java.util.TimeZone;
@@ -134,8 +133,8 @@ public class ListStreamsIT {
Assert.assertEquals(tableName, phoenixStream.tableName());
SimpleDateFormat df = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
- Date date = df.parse(phoenixStream.streamLabel());
-
Assert.assertTrue(phoenixStream.streamArn().contains(String.valueOf(date.getTime())));
+ df.parse(phoenixStream.streamLabel());
+ Assert.assertTrue(phoenixStream.streamArn().endsWith("/stream/" +
phoenixStream.streamLabel()));
}
@Test(timeout = 120000)
diff --git
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
index 40614c5..24b4160 100644
---
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
+++
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
@@ -15,9 +15,15 @@ import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.format.ResolverStyle;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
@@ -34,13 +40,33 @@ public class DdbAdapterCdcUtils {
public static final int OFFSET_LENGTH = 5;
public static final int MAX_NUM_CHANGES_AT_TIMESTAMP = (int) Math.pow(10,
OFFSET_LENGTH);
- //
shardIterator/<tableName>/<cdcObject>/<streamType>/<partitionID>/<startSeqNum>
- public static final String SHARD_ITERATOR_FORMAT =
"shardIterator/%s/%s/%s/%s/%s";
- public static final String SHARD_ITERATOR_DELIM = "/";
- public static final int SHARD_ITERATOR_NUM_PARTS = 6;
+ public static final String SHARD_ITERATOR_VERSION = "1";
+ public static final String SHARD_ITERATOR_DELIM = "|";
+ public static final int SHARD_ITERATOR_NUM_PARTS = 3;
+ // JSON key names for the inner state payload of shard iterator
+ public static final String SI_FIELD_STREAM_TYPE = "streamType";
+ public static final String SI_FIELD_PARTITION_ID = "partitionId";
+ public static final String SI_FIELD_SEQ_NUM = "seqNum";
// phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index
timestamp}/{creation datetime}
public static final String STREAM_NAME_DELIM = "/";
public static final int STREAM_NAME_NUM_PARTS = 7;
+ public static final String STREAM_NAME_PREFIX = "phoenix/cdc/stream/";
+ public static final String CDC_OBJECT_PREFIX = "CDC_";
+
+ public static final String STREAM_ARN_REGION = "us-west-2";
+ public static final String STREAM_ARN_ACCOUNT_ID = "000000000000";
+ public static final String STREAM_ARN_PREFIX =
+ "arn:aws:dynamodb:" + STREAM_ARN_REGION + ":" + STREAM_ARN_ACCOUNT_ID
+ ":table/";
+ public static final String STREAM_ARN_INFIX = "/stream/";
+
+ // ShardId format: shardId-<partitionStartMs>-<32-char-hex-partition-id>
+ public static final String SHARD_ID_PREFIX = "shardId-";
+ private static final String SHARD_ID_DELIM = "-";
+ public static final String STREAM_LABEL_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSS";
+ private static final String STREAM_LABEL_PARSE_PATTERN =
"uuuu-MM-dd'T'HH:mm:ss.SSS";
+ private static final DateTimeFormatter STREAM_LABEL_FORMATTER =
+ DateTimeFormatter.ofPattern(STREAM_LABEL_PARSE_PATTERN, Locale.ROOT)
+ .withResolverStyle(ResolverStyle.STRICT).withZone(ZoneOffset.UTC);
private static final String STREAM_NAME_QUERY
= "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME
@@ -145,6 +171,106 @@ public class DdbAdapterCdcUtils {
return Long.parseLong(getStreamNameComponent(streamName, 5));
}
+ /**
+ * Detect whether the given identifier is the AWS-shaped stream ARN
+ * rather than the internal stream name.
+ */
+ public static boolean isStreamArn(String s) {
+ return s != null && s.startsWith(STREAM_ARN_PREFIX);
+ }
+
+ /**
+ * Convert the internal stream name to the AWS-shaped ARN
+ * (e.g.
arn:aws:dynamodb:us-west-2:000000000000:table/MyTable/stream/2024-01-15T10:30:00.000).
+ */
+ public static String toStreamArn(String streamName) {
+ String internalTableName = getTableNameFromStreamName(streamName);
+ String bareTableName =
PhoenixUtils.getTableNameFromFullName(internalTableName, false);
+ String creationDateTime = getStreamLabel(streamName);
+ return STREAM_ARN_PREFIX + bareTableName + STREAM_ARN_INFIX +
creationDateTime;
+ }
+
+ /**
+ * Convert the AWS-shaped stream ARN back to the internal stream name.
+ */
+ public static String fromStreamArn(String streamArn) {
+ if (!isStreamArn(streamArn)) {
+ throw new IllegalArgumentException("Not a synthetic stream ARN: "
+ streamArn);
+ }
+ String body = streamArn.substring(STREAM_ARN_PREFIX.length());
+ int infixIdx = body.indexOf(STREAM_ARN_INFIX);
+ if (infixIdx < 0) {
+ throw new IllegalArgumentException("Stream ARN missing /stream/
segment: " + streamArn);
+ }
+ String arnTableName = body.substring(0, infixIdx);
+ String creationDateTime = body.substring(infixIdx +
STREAM_ARN_INFIX.length());
+ long cdcIndexTimestamp = parseCreationDateTime(creationDateTime,
streamArn);
+ String bareTableName =
PhoenixUtils.getTableNameFromFullName(arnTableName, false);
+ String internalTableName =
PhoenixUtils.getFullTableName(bareTableName, false);
+ return STREAM_NAME_PREFIX + internalTableName + STREAM_NAME_DELIM
+ + CDC_OBJECT_PREFIX + bareTableName + STREAM_NAME_DELIM
+ + cdcIndexTimestamp + STREAM_NAME_DELIM + creationDateTime;
+ }
+
+ public static String normalizeStreamName(String streamArnOrName) {
+ if (streamArnOrName == null || streamArnOrName.isEmpty()) {
+ throw new IllegalArgumentException("StreamArn is required");
+ }
+ return isStreamArn(streamArnOrName) ? fromStreamArn(streamArnOrName) :
streamArnOrName;
+ }
+
+ /**
+ * Encode an internal Phoenix partition into the AWS-shaped {@code ShardId}
+ * {@code shardId-<partitionStartMs>-<partitionHex>}. Length stays in the
AWS
+ * spec range [28, 65] for any positive epoch-millis and the 32-char HBase
region
+ * encoded partition id.
+ *
+ * @param partitionStartMs epoch millis of the partition's start time
+ * @param partitionHex HBase region encoded partition id (32-char
lowercase hex)
+ */
+ public static String toShardId(long partitionStartMs, String partitionHex)
{
+ return SHARD_ID_PREFIX + partitionStartMs + SHARD_ID_DELIM +
partitionHex;
+ }
+
+ /**
+ * Detect whether the given identifier is the AWS-shaped {@code ShardId}
rather than
+ * the raw HBase region encoded partition id.
+ */
+ public static boolean isShardId(String s) {
+ return s != null && s.startsWith(SHARD_ID_PREFIX);
+ }
+
+ /**
+ * Extract the bare HBase region encoded partition id from either the new
+ * AWS-shaped {@code ShardId} ({@code shardId-<ms>-<hex>}) or a raw
partition
+ * hex string.
+ */
+ public static String partitionIdFromShardId(String shardIdOrPartitionHex) {
+ if (shardIdOrPartitionHex == null || shardIdOrPartitionHex.isEmpty()) {
+ throw new IllegalArgumentException("ShardId is required");
+ }
+ if (!isShardId(shardIdOrPartitionHex)) {
+ return shardIdOrPartitionHex;
+ }
+ String body =
shardIdOrPartitionHex.substring(SHARD_ID_PREFIX.length());
+ int dashIdx = body.indexOf(SHARD_ID_DELIM);
+ if (dashIdx < 0 || dashIdx == body.length() - 1) {
+ throw new IllegalArgumentException(
+ "ShardId missing partition hex segment: " +
shardIdOrPartitionHex);
+ }
+ return body.substring(dashIdx + 1);
+ }
+
+ private static long parseCreationDateTime(String creationDateTime, String
streamArn) {
+ try {
+ return STREAM_LABEL_FORMATTER.parse(creationDateTime,
Instant::from).toEpochMilli();
+ } catch (DateTimeParseException e) {
+ throw new IllegalArgumentException(
+ "Stream ARN label is not a UTC ISO timestamp (" +
STREAM_LABEL_FORMAT + "): "
+ + streamArn, e);
+ }
+ }
+
/**
* Return the stream type for the given table stored in the SCHEMA_VERSION
column of the ptable.
*/
@@ -186,11 +312,28 @@ public class DdbAdapterCdcUtils {
}
/**
- * Build a sequence number of the form <timestamp><offset>.
- * offset should be 0-padded for OFFSET_LENGTH
+ * Build a 21-digit zero-padded sequence number (SequenceNumber min length
21,
+ * max 40). The numeric value is identical to the form
+ * (timestamp * 10^OFFSET_LENGTH + offset)
*/
public static String getSequenceNumber(long timestamp, int offset) {
- return timestamp + String.format("%0" + OFFSET_LENGTH + "d", offset);
+ return String.format("%021d", timestamp * MAX_NUM_CHANGES_AT_TIMESTAMP
+ offset);
+ }
+
+ public static long parseSequenceNumber(String s) {
+ if (s == null || s.isEmpty()) {
+ throw new IllegalArgumentException("SequenceNumber is required");
+ }
+ long val;
+ try {
+ val = Long.parseLong(s);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("SequenceNumber is not numeric:
" + s, e);
+ }
+ if (val < 0) {
+ throw new IllegalArgumentException("SequenceNumber must be
non-negative: " + s);
+ }
+ return val;
}
/**
@@ -199,12 +342,13 @@ public class DdbAdapterCdcUtils {
*
* @param tableName the table name from the shard iterator
* @param partitionId the partition (shard) ID
- * @param sequenceNumber the per-event sequence number
+ * @param sequenceNumber the per-event sequence number (any padded length)
* @return 32-char lowercase hex string
*/
public static String getEventId(String tableName, String partitionId,
String sequenceNumber) {
+ long canonicalSeq = parseSequenceNumber(sequenceNumber);
try {
- String input = tableName + "|" + partitionId + "|" +
sequenceNumber;
+ String input = tableName + "|" + partitionId + "|" + canonicalSeq;
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(input.getBytes(StandardCharsets.UTF_8));
return String.format("%032x", new BigInteger(1, digest));
diff --git
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
index 69a5597..7b3d918 100644
---
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
+++
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/PhoenixShardIterator.java
@@ -1,17 +1,40 @@
package org.apache.phoenix.ddb.utils;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
import static org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.OFFSET_LENGTH;
import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_DELIM;
-import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_FORMAT;
import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_NUM_PARTS;
+import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SHARD_ITERATOR_VERSION;
+import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SI_FIELD_PARTITION_ID;
+import static org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SI_FIELD_SEQ_NUM;
+import static
org.apache.phoenix.ddb.utils.DdbAdapterCdcUtils.SI_FIELD_STREAM_TYPE;
/**
* Class to represent a shard iterator for Phoenix CDC queries.
- * Format:
shardIterator-<tableName>-<cdcObject>-<streamType>-<partitionID>-<startSeqNum>
+ * The format:
+ * <pre>
+ * <streamArn>|<version>|<base64(JSON state)>
+ * </pre>
*/
public class PhoenixShardIterator {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Pattern DELIM_PATTERN =
+ Pattern.compile(Pattern.quote(SHARD_ITERATOR_DELIM));
+ private static final TypeReference<Map<String, String>> STATE_TYPE_REF =
+ new TypeReference<Map<String, String>>() {
+ };
+
+ private final String streamArn;
private final String tableName;
private final String cdcObject;
@@ -21,20 +44,67 @@ public class PhoenixShardIterator {
private long timestamp;
private int offset;
+ /**
+ * Build a fresh iterator from server-side state. Used by
+ * {@code GetShardIteratorService} when emitting a brand-new iterator.
+ *
+ * @param streamArn stream ARN
+ * @param streamName internal Phoenix stream name
+ * @param streamType stream view type (NEW_IMAGE, OLD_IMAGE, KEYS_ONLY,
+ * NEW_AND_OLD_IMAGES)
+ * @param partitionId region encoded partition id
+ * @param seqNum resume sequence number
+ */
+ public PhoenixShardIterator(String streamArn, String streamName, String
streamType,
+ String partitionId, String seqNum) {
+ this.streamArn = streamArn;
+ this.tableName =
DdbAdapterCdcUtils.getTableNameFromStreamName(streamName);
+ this.cdcObject =
DdbAdapterCdcUtils.getCDCObjectNameFromStreamName(streamName);
+ this.streamType = streamType;
+ this.partitionId = partitionId;
+ this.seqNum = seqNum;
+ setTimestampAndOffset();
+ }
+
public PhoenixShardIterator(String shardIterator) {
- String[] shardIteratorComponents =
shardIterator.split(SHARD_ITERATOR_DELIM);
- if (shardIteratorComponents.length != SHARD_ITERATOR_NUM_PARTS) {
- throw new IllegalArgumentException(shardIterator
- + ": Provided shard iterator is not of the right format.");
+ if (shardIterator == null || shardIterator.isEmpty()) {
+ throw new IllegalArgumentException("ShardIterator is required");
+ }
+ String[] parts = DELIM_PATTERN.split(shardIterator, -1);
+ if (parts.length != SHARD_ITERATOR_NUM_PARTS) {
+ throw new IllegalArgumentException(
+ "ShardIterator must be of the form
<streamArn>|<version>|<base64-state>: "
+ + shardIterator);
+ }
+ String parsedStreamArn = parts[0];
+ String version = parts[1];
+ String base64State = parts[2];
+ if (!SHARD_ITERATOR_VERSION.equals(version)) {
+ throw new IllegalArgumentException(
+ "Unsupported ShardIterator version: " + version + " (expected "
+ + SHARD_ITERATOR_VERSION + ")");
+ }
+ String internalStreamName =
DdbAdapterCdcUtils.fromStreamArn(parsedStreamArn);
+ this.streamArn = parsedStreamArn;
+ this.tableName =
DdbAdapterCdcUtils.getTableNameFromStreamName(internalStreamName);
+ this.cdcObject =
DdbAdapterCdcUtils.getCDCObjectNameFromStreamName(internalStreamName);
+
+ Map<String, String> state = decodeState(base64State, shardIterator);
+ this.streamType = state.get(SI_FIELD_STREAM_TYPE);
+ this.partitionId = state.get(SI_FIELD_PARTITION_ID);
+ this.seqNum = state.get(SI_FIELD_SEQ_NUM);
+ if (this.streamType == null || this.partitionId == null || this.seqNum
== null) {
+ throw new IllegalArgumentException(
+ "ShardIterator state missing required fields (" +
SI_FIELD_STREAM_TYPE + ", "
+ + SI_FIELD_PARTITION_ID + ", " + SI_FIELD_SEQ_NUM + "): "
+ shardIterator);
}
- this.tableName = shardIteratorComponents[1];
- this.cdcObject = shardIteratorComponents[2];
- this.streamType = shardIteratorComponents[3];
- this.partitionId = shardIteratorComponents[4];
- this.seqNum = shardIteratorComponents[5];
setTimestampAndOffset();
}
+ public String getStreamArn() {
+ return streamArn;
+ }
+
public String getTableName() {
return tableName;
}
@@ -71,8 +141,36 @@ public class PhoenixShardIterator {
@Override
public String toString() {
- return String.format(SHARD_ITERATOR_FORMAT,
- tableName, cdcObject, streamType, partitionId, seqNum);
+ Map<String, String> state = new LinkedHashMap<>();
+ state.put(SI_FIELD_STREAM_TYPE, streamType);
+ state.put(SI_FIELD_PARTITION_ID, partitionId);
+ state.put(SI_FIELD_SEQ_NUM, seqNum);
+ byte[] jsonBytes;
+ try {
+ jsonBytes = OBJECT_MAPPER.writeValueAsBytes(state);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to serialize ShardIterator
state", e);
+ }
+ String base64State = Base64.getEncoder().encodeToString(jsonBytes);
+ return streamArn + SHARD_ITERATOR_DELIM + SHARD_ITERATOR_VERSION +
SHARD_ITERATOR_DELIM
+ + base64State;
+ }
+
+ private static Map<String, String> decodeState(String base64State, String
shardIterator) {
+ byte[] stateBytes;
+ try {
+ stateBytes = Base64.getDecoder().decode(base64State);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "ShardIterator base64 state is not decodable: " +
shardIterator, e);
+ }
+ try {
+ return OBJECT_MAPPER.readValue(stateBytes, STATE_TYPE_REF);
+ } catch (IOException e) {
+ String decoded = new String(stateBytes, StandardCharsets.UTF_8);
+ throw new IllegalArgumentException("ShardIterator state is not
valid JSON: " + decoded,
+ e);
+ }
}
private void setTimestampAndOffset() {