This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 6e3b112 eventId for change stream record
6e3b112 is described below
commit 6e3b112a972dbf466ad4184abdf9f8ca4a5eb104
Author: Palash Chauhan <[email protected]>
AuthorDate: Tue May 12 10:50:22 2026 -0700
eventId for change stream record
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../phoenix/ddb/service/GetRecordsService.java | 10 ++++---
.../java/org/apache/phoenix/ddb/GetRecordsIT.java | 5 ++++
.../org/apache/phoenix/ddb/ReturnItemsLimitIT.java | 2 +-
.../java/org/apache/phoenix/ddb/TestUtils.java | 34 +++++++++++++++++++++-
.../org/apache/phoenix/ddb/utils/ApiMetadata.java | 1 +
.../phoenix/ddb/utils/DdbAdapterCdcUtils.java | 24 +++++++++++++++
6 files changed, 70 insertions(+), 6 deletions(-)
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
index 84ec289..4a49500 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
@@ -20,7 +20,6 @@ import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -88,8 +87,9 @@ public class GetRecordsService {
lastTs = ts;
lastOffset=0;
}
- record = getStreamRecord(rs, pIter.getStreamType(), pkCols,
- DdbAdapterCdcUtils.getSequenceNumber(lastTs,
lastOffset));
+ String seqNum = DdbAdapterCdcUtils.getSequenceNumber(lastTs,
lastOffset);
+ record = getStreamRecord(rs, pIter.getStreamType(), pkCols,
seqNum,
+ pIter.getTableName(), pIter.getPartitionId());
records.add(record);
count++;
bytesSize +=
@@ -156,7 +156,8 @@ public class GetRecordsService {
* rs --> timestamp, pk1, (pk2), cdcJson
*/
private static Map<String, Object> getStreamRecord(ResultSet rs, String
streamType, List<PColumn> pkCols,
- String seqNum) throws SQLException,
JsonProcessingException {
+ String seqNum, String tableName,
String partitionId)
+ throws SQLException, JsonProcessingException {
Map<String, Object> streamRecord = new HashMap<>();
streamRecord.put(ApiMetadata.STREAM_VIEW_TYPE, streamType);
streamRecord.put(ApiMetadata.SEQUENCE_NUMBER, seqNum);
@@ -211,6 +212,7 @@ public class GetRecordsService {
} else {
record.put(ApiMetadata.EVENT_NAME, "MODIFY");
}
+ record.put(ApiMetadata.EVENT_ID,
DdbAdapterCdcUtils.getEventId(tableName, partitionId, seqNum));
return record;
}
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java
index cd0a85c..5595070 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java
@@ -486,5 +486,10 @@ public class GetRecordsIT {
Assert.assertEquals(ddbRecord.dynamodb().newImage(),
phoenixRecord.dynamodb().newImage());
Assert.assertTrue(ddbRecord.dynamodb().sizeBytes() > 0);
Assert.assertTrue(phoenixRecord.dynamodb().sizeBytes() > 0);
+ // eventID must be present and be a 32-char hex string
+ Assert.assertNotNull("eventID must be present",
phoenixRecord.eventID());
+ Assert.assertTrue("eventID must be 32-char hex",
+ phoenixRecord.eventID().matches("[0-9a-f]{32}"));
}
+
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java
index c6388f1..409acfc 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java
@@ -508,7 +508,7 @@ public class ReturnItemsLimitIT {
// Verify that both services returned the same number of records
Assert.assertEquals("Phoenix and DynamoDB should return same number of
records",
ddbAllResponses.size(), phoenixAllResponses.size());
- TestUtils.validateRecords(ddbAllResponses, phoenixAllResponses);
+ TestUtils.validateRecords(phoenixAllResponses, ddbAllResponses);
}
/**
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
index 75cf914..f1a5fe7 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
@@ -23,8 +23,10 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -223,7 +225,7 @@ public class TestUtils {
}
/**
- * Validate change records.
+ * Validate change records. Ensure first argument is Phoenix records.
*/
public static void validateRecords(List<Record> phoenixRecords,
List<Record> dynamoRecords) {
Assert.assertEquals("Stream record counts should match between Phoenix
and DynamoDB",
@@ -250,6 +252,36 @@ public class TestUtils {
Assert.assertTrue("Phoenix record size should be greater than 0
for record " + i,
pr.dynamodb().sizeBytes() > 0);
}
+ assertEventIdsUnique(phoenixRecords);
+ }
+
+ /**
+ * Assert that eventIDs are present, well-formed, and unique across
distinct events.
+ * Records with the same sequenceNumber (re-read via AT_SEQUENCE_NUMBER)
must produce
+ * the same eventID (deterministic). Records with different
sequenceNumbers must produce
+ * different eventIDs (unique).
+ */
+ public static void assertEventIdsUnique(List<Record> records) {
+ // First pass: build seqNum -> eventID map, verifying format
+ Map<String, String> seqToEventId = new HashMap<>();
+ for (Record record : records) {
+ Assert.assertNotNull("eventID must be present", record.eventID());
+ Assert.assertTrue("eventID must be 32-char hex",
+ record.eventID().matches("[0-9a-f]{32}"));
+ String seqNum = record.dynamodb().sequenceNumber();
+ seqToEventId.put(seqNum, record.eventID());
+ }
+
+ // Second pass: verify determinism and uniqueness
+ Set<String> seenEventIds = new HashSet<>();
+ for (Record record : records) {
+ String seqNum = record.dynamodb().sequenceNumber();
+ Assert.assertEquals("eventID must be stable for same
sequenceNumber",
+ seqToEventId.get(seqNum), record.eventID());
+ seenEventIds.add(record.eventID());
+ }
+ Assert.assertEquals("eventIDs must be unique across distinct events",
+ seqToEventId.size(), seenEventIds.size());
}
/**
diff --git
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java
index ce102e1..9df9dfa 100644
---
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java
+++
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java
@@ -155,6 +155,7 @@ public class ApiMetadata {
public static final String OLD_IMAGE = "OldImage";
public static final String KEYS = "Keys";
public static final String SIZE_BYTES = "SizeBytes";
+ public static final String EVENT_ID = "eventID";
public static final String USER_IDENTITY = "userIdentity";
public static final String TYPE = "Type";
public static final String PRINCIPAL_ID = "PrincipalId";
diff --git
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
index e454be2..40614c5 100644
---
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
+++
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
@@ -8,6 +8,10 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.CDCUtil;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -189,6 +193,26 @@ public class DdbAdapterCdcUtils {
return timestamp + String.format("%0" + OFFSET_LENGTH + "d", offset);
}
+ /**
+ * Generate a globally unique, deterministic eventID for a change stream
record.
+ * Formula: md5Hex(tableName + "|" + partitionId + "|" + sequenceNumber)
+ *
+ * @param tableName the table name from the shard iterator
+ * @param partitionId the partition (shard) ID
+ * @param sequenceNumber the per-event sequence number
+ * @return 32-char lowercase hex string
+ */
+ public static String getEventId(String tableName, String partitionId,
String sequenceNumber) {
+ try {
+ String input = tableName + "|" + partitionId + "|" +
sequenceNumber;
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ byte[] digest = md.digest(input.getBytes(StandardCharsets.UTF_8));
+ return String.format("%032x", new BigInteger(1, digest));
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("MD5 algorithm not available", e);
+ }
+ }
+
/**
* Parse the provided stream name and return a particular component.
* @param streamName stream name