This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 6b16d2f Duplicates and table existence validation in BatchWriteItem
6b16d2f is described below
commit 6b16d2f84586baf76a3efcd457436a413fd9c6c0
Author: Palash Chauhan <[email protected]>
AuthorDate: Fri Jan 30 11:59:07 2026 -0800
Duplicates and table existence validation in BatchWriteItem
---
.../phoenix/ddb/service/BatchWriteItemService.java | 2 +-
.../phoenix/ddb/service/utils/ValidationUtil.java | 49 +++++-
.../org/apache/phoenix/ddb/BatchWriteItemIT.java | 174 ++++++++++++++++++---
.../java/org/apache/phoenix/ddb/ValidationIT.java | 5 -
4 files changed, 202 insertions(+), 28 deletions(-)
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/BatchWriteItemService.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/BatchWriteItemService.java
index 0531694..d5228de 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/BatchWriteItemService.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/BatchWriteItemService.java
@@ -17,8 +17,8 @@ public class BatchWriteItemService {
public static Map<String, Object> batchWriteItem(Map<String, Object>
request,
String connectionUrl) {
- ValidationUtil.validateBatchWriteItemRequest(request);
try (Connection connection =
ConnectionUtil.getConnection(connectionUrl)) {
+ ValidationUtil.validateBatchWriteItemRequest(connection, request);
connection.setAutoCommit(false);
Map<String, List<Map<String, Object>>> requestItems =
(Map<String, List<Map<String, Object>>>)
request.get(ApiMetadata.REQUEST_ITEMS);
diff --git
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
index 2eee72e..b3e219e 100644
---
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
+++
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/utils/ValidationUtil.java
@@ -18,13 +18,20 @@
package org.apache.phoenix.ddb.service.utils;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.phoenix.ddb.rest.metrics.ApiOperation;
import org.apache.phoenix.ddb.service.ScanService;
import org.apache.phoenix.ddb.service.exceptions.ValidationException;
import org.apache.phoenix.ddb.utils.ApiMetadata;
+import org.apache.phoenix.ddb.utils.PhoenixUtils;
+import org.apache.phoenix.schema.PColumn;
/**
* Validation for various API requests.
@@ -34,16 +41,18 @@ public class ValidationUtil {
private static final int BATCH_WRITE_LIMIT = 25;
private static final int BATCH_GET_LIMIT = 100;
- public static void validateBatchWriteItemRequest(Map<String, Object>
request) {
+ public static void validateBatchWriteItemRequest(Connection conn,
Map<String, Object> request)
+ throws SQLException {
int numItems = 0;
Map<String, Object> requestItems = (Map<String, Object>)
request.get(ApiMetadata.REQUEST_ITEMS);
for (Map.Entry<String, Object> entry : requestItems.entrySet()) {
- List<Object> ops = (List<Object>) entry.getValue();
+ List<Map<String, Object>> ops = (List<Map<String, Object>>)
entry.getValue();
if (ops != null) {
numItems += ops.size();
if (numItems > BATCH_WRITE_LIMIT) {
- throw new ValidationException("Too many items requested
for the BatchGetItem call.");
+ throw new ValidationException("Too many items requested
for the BatchWriteItem call");
}
+ validateNonDuplicateKeys(conn, (String)entry.getKey(), ops);
}
}
}
@@ -57,7 +66,7 @@ public class ValidationUtil {
if (keys != null) {
numItems += keys.size();
if (numItems > BATCH_GET_LIMIT) {
- throw new ValidationException("Too many items requested
for the BatchGetItem call.");
+ throw new ValidationException("Too many items requested
for the BatchGetItem call");
}
}
}
@@ -218,4 +227,36 @@ public class ValidationUtil {
validateReturnValues(returnValue, apiOperation);
validateReturnValuesOnConditionCheckFailure(returnValuesOnConditionCheckFailure);
}
+
+ private static void validateNonDuplicateKeys(Connection conn, String
tableName,
+ List<Map<String, Object>> ops)
+ throws SQLException {
+ List<PColumn> pkCols = PhoenixUtils.getPKColumns(conn, tableName);
+ Set<Map<String, Object>> keys = new HashSet<>();
+ for (Map<String, Object> op : ops) {
+ Map<String, Object> key;
+ if (op.containsKey(ApiMetadata.PUT_REQUEST)) {
+ Map<String, Object> item = (Map<String, Object>)
+ ((Map<String, Object>)
op.get(ApiMetadata.PUT_REQUEST)).get(ApiMetadata.ITEM);
+ key = getKey(item, pkCols);
+ } else if (op.containsKey(ApiMetadata.DELETE_REQUEST)) {
+ key = (Map<String, Object>)
+ ((Map<String,
Object>)op.get(ApiMetadata.DELETE_REQUEST)).get(ApiMetadata.KEY);
+ } else {
+ throw new ValidationException("Unsupported request type for
BatchWriteItem");
+ }
+ if (!keys.add(key)) {
+ throw new ValidationException("Provided list of item keys
contains duplicates");
+ }
+ }
+ }
+
+ private static Map<String, Object> getKey(Map<String, Object> item,
List<PColumn> pkCols) {
+ Map<String,Object> key = new HashMap<>();
+ for (PColumn pkCol : pkCols) {
+ String pkName = pkCol.getName().toString();
+ key.put(pkName, item.get(pkName));
+ }
+ return key;
+ }
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BatchWriteItemIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BatchWriteItemIT.java
index f707b2e..271c789 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BatchWriteItemIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BatchWriteItemIT.java
@@ -120,10 +120,10 @@ public class BatchWriteItemIT {
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem3()).build())
.build());
writeReqs.add(WriteRequest.builder().deleteRequest(
- DeleteRequest.builder().key(getKey(getItem1(), new String[]
{"PK1", "PK2"}))
+ DeleteRequest.builder().key(getKey(getItem1(), new String[]
{"pk1", "pk2"}))
.build()).build());
writeReqs.add(WriteRequest.builder().deleteRequest(
- DeleteRequest.builder().key(getKey(getItem2(), new String[]
{"PK1", "PK2"}))
+ DeleteRequest.builder().key(getKey(getItem2(), new String[]
{"pk1", "pk2"}))
.build()).build());
Map<String, List<WriteRequest>> requestItems = new HashMap<>();
requestItems.put(tableName, writeReqs);
@@ -156,7 +156,7 @@ public class BatchWriteItemIT {
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem3()).build())
.build());
writeReqs1.add(WriteRequest.builder().deleteRequest(
- DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"PK1", "PK2"}))
+ DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"pk1", "pk2"}))
.build()).build());
//table2
@@ -179,7 +179,6 @@ public class BatchWriteItemIT {
requestItems.put(tableName2, writeReqs2);
BatchWriteItemRequest request =
BatchWriteItemRequest.builder().requestItems(requestItems).build();
- ;
BatchWriteItemResponse dynamoResult =
dynamoDbClient.batchWriteItem(request);
BatchWriteItemResponse phoenixResult =
phoenixDBClientV2.batchWriteItem(request);
@@ -190,7 +189,7 @@ public class BatchWriteItemIT {
}
@Test
- public void testValidationException() {
+ public void testValidationExceptionTotalItemsLimit() {
String testname = testName.getMethodName();
String tableName1 = testname + "_1";
String tableName2 = testname + "_2";
@@ -204,7 +203,7 @@ public class BatchWriteItemIT {
.putRequest(PutRequest.builder().item(getNewItem1(i)).build()).build());
}
writeReqs1.add(WriteRequest.builder().deleteRequest(
- DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"PK1", "PK2"}))
+ DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"pk1", "pk2"}))
.build()).build());
writeReqs1.add(WriteRequest.builder()
.putRequest(PutRequest.builder().item(getNewItem1(27)).build()).build());
@@ -237,9 +236,148 @@ public class BatchWriteItemIT {
}
}
+ @Test
+ public void testValidationExceptionNonExistentTable() {
+ String testname = testName.getMethodName();
+ String tableName1 = testname + "_1";
+
+ List<WriteRequest> writeReqs1 = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ writeReqs1.add(WriteRequest.builder()
+
.putRequest(PutRequest.builder().item(getNewItem1(i)).build()).build());
+ }
+ writeReqs1.add(WriteRequest.builder().deleteRequest(
+ DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"pk1", "pk2"}))
+ .build()).build());
+
+ Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+ requestItems.put(tableName1, writeReqs1);
+ BatchWriteItemRequest request =
+
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+
+ try {
+ dynamoDbClient.batchWriteItem(request);
+ } catch (DynamoDbException e) {
+ Assert.assertEquals(400, e.statusCode());
+ }
+ try {
+ phoenixDBClientV2.batchWriteItem(request);
+ } catch (DynamoDbException e) {
+ Assert.assertEquals(400, e.statusCode());
+ }
+ }
+
+ @Test
+ public void testValidationExceptionDuplicatePuts() {
+ String testname = testName.getMethodName();
+ String tableName1 = testname + "_1";
+ String tableName2 = testname + "_2";
+ createTable1(tableName1);
+ createTable2(tableName2);
+
+ //table1
+ List<WriteRequest> writeReqs1 = new ArrayList<>();
+ writeReqs1.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem2()).build())
+ .build());
+ writeReqs1.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem3()).build())
+ .build());
+ writeReqs1.add(WriteRequest.builder().deleteRequest(
+ DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"pk1", "pk2"}))
+ .build()).build());
+
+ //table2, duplicate puts
+ List<WriteRequest> writeReqs2 = new ArrayList<>();
+ writeReqs2.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem1()).build())
+ .build());
+ writeReqs2.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem2()).build())
+ .build());
+ writeReqs2.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem3()).build())
+ .build());
+ writeReqs2.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem2()).build())
+ .build());
+ writeReqs2.add(WriteRequest.builder().deleteRequest(
+ DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"COL1", "COL2"}))
+ .build()).build());
+ Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+ requestItems.put(tableName1, writeReqs1);
+ requestItems.put(tableName2, writeReqs2);
+ BatchWriteItemRequest request =
+
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+ try {
+ dynamoDbClient.batchWriteItem(request);
+ } catch (DynamoDbException e) {
+ Assert.assertEquals(400, e.statusCode());
+ }
+ try {
+ phoenixDBClientV2.batchWriteItem(request);
+ } catch (DynamoDbException e) {
+ Assert.assertEquals(400, e.statusCode());
+ }
+ }
+
+ @Test
+ public void testValidationExceptionDuplicatePutAndDelete() {
+ String testname = testName.getMethodName();
+ String tableName1 = testname + "_1";
+ String tableName2 = testname + "_2";
+ createTable1(tableName1);
+ createTable2(tableName2);
+
+ //table1
+ List<WriteRequest> writeReqs1 = new ArrayList<>();
+ writeReqs1.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem2()).build())
+ .build());
+ writeReqs1.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem3()).build())
+ .build());
+ writeReqs1.add(WriteRequest.builder().deleteRequest(
+ DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"pk1", "pk2"}))
+ .build()).build());
+
+ //table2, duplicate put and delete
+ List<WriteRequest> writeReqs2 = new ArrayList<>();
+ writeReqs2.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem1()).build())
+ .build());
+ writeReqs2.add(
+
WriteRequest.builder().putRequest(PutRequest.builder().item(getItem2()).build())
+ .build());
+ writeReqs2.add(WriteRequest.builder().deleteRequest(
+ DeleteRequest.builder().key(getKey(getItem3(), new String[]
{"COL1", "COL2"}))
+ .build()).build());
+ writeReqs2.add(WriteRequest.builder().deleteRequest(
+ DeleteRequest.builder().key(getKey(getItem4(), new String[]
{"COL1", "COL2"}))
+ .build()).build());
+ writeReqs2.add(WriteRequest.builder().deleteRequest(
+ DeleteRequest.builder().key(getKey(getItem1(), new String[]
{"COL1", "COL2"}))
+ .build()).build());
+ Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+ requestItems.put(tableName1, writeReqs1);
+ requestItems.put(tableName2, writeReqs2);
+ BatchWriteItemRequest request =
+
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+ try {
+ dynamoDbClient.batchWriteItem(request);
+ } catch (DynamoDbException e) {
+ Assert.assertEquals(400, e.statusCode());
+ }
+ try {
+ phoenixDBClientV2.batchWriteItem(request);
+ } catch (DynamoDbException e) {
+ Assert.assertEquals(400, e.statusCode());
+ }
+ }
+
private void createTable1(String tableName) {
CreateTableRequest createTableRequest =
- DDLTestUtils.getCreateTableRequest(tableName, "PK1",
ScalarAttributeType.S, "PK2",
+ DDLTestUtils.getCreateTableRequest(tableName, "pk1",
ScalarAttributeType.S, "pk2",
ScalarAttributeType.N);
phoenixDBClientV2.createTable(createTableRequest);
dynamoDbClient.createTable(createTableRequest);
@@ -271,8 +409,8 @@ public class BatchWriteItemIT {
private Map<String, AttributeValue> getItem1() {
Map<String, AttributeValue> item = new HashMap<>();
- item.put("PK1", AttributeValue.builder().s("A").build());
- item.put("PK2", AttributeValue.builder().n("1").build());
+ item.put("pk1", AttributeValue.builder().s("A").build());
+ item.put("pk2", AttributeValue.builder().n("1").build());
item.put("COL1", AttributeValue.builder().n("1").build());
item.put("COL2", AttributeValue.builder().s("Title1").build());
return item;
@@ -280,8 +418,8 @@ public class BatchWriteItemIT {
private Map<String, AttributeValue> getItem2() {
Map<String, AttributeValue> item = new HashMap<>();
- item.put("PK1", AttributeValue.builder().s("B").build());
- item.put("PK2", AttributeValue.builder().n("2").build());
+ item.put("pk1", AttributeValue.builder().s("B").build());
+ item.put("pk2", AttributeValue.builder().n("2").build());
item.put("COL1", AttributeValue.builder().n("3").build());
item.put("COL2", AttributeValue.builder().s("Title2").build());
return item;
@@ -289,8 +427,8 @@ public class BatchWriteItemIT {
private Map<String, AttributeValue> getItem3() {
Map<String, AttributeValue> item = new HashMap<>();
- item.put("PK1", AttributeValue.builder().s("C").build());
- item.put("PK2", AttributeValue.builder().n("3").build());
+ item.put("pk1", AttributeValue.builder().s("C").build());
+ item.put("pk2", AttributeValue.builder().n("3").build());
item.put("COL1", AttributeValue.builder().n("4").build());
item.put("COL2", AttributeValue.builder().s("Title3").build());
return item;
@@ -298,8 +436,8 @@ public class BatchWriteItemIT {
private Map<String, AttributeValue> getItem4() {
Map<String, AttributeValue> item = new HashMap<>();
- item.put("PK1", AttributeValue.builder().s("D").build());
- item.put("PK2", AttributeValue.builder().n("4").build());
+ item.put("pk1", AttributeValue.builder().s("D").build());
+ item.put("pk2", AttributeValue.builder().n("4").build());
item.put("COL1", AttributeValue.builder().n("5").build());
item.put("COL2", AttributeValue.builder().s("Title4").build());
return item;
@@ -307,15 +445,15 @@ public class BatchWriteItemIT {
private Map<String, AttributeValue> getNewItem1(int i) {
Map<String, AttributeValue> item = getItem1();
- Integer pk2 = Integer.parseInt(item.get("PK2").n()) * i;
- item.put("PK2", AttributeValue.builder().n(pk2.toString()).build());
+ Integer pk2 = Integer.parseInt(item.get("pk2").n()) * i;
+ item.put("pk2", AttributeValue.builder().n(pk2.toString()).build());
return item;
}
private Map<String, AttributeValue> getNewItem2(int i) {
Map<String, AttributeValue> item = getItem2();
Integer pk2 = Integer.parseInt(item.get("COL1").n()) * i;
- item.put("PK2", AttributeValue.builder().n(pk2.toString()).build());
+ item.put("pk2", AttributeValue.builder().n(pk2.toString()).build());
return item;
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ValidationIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ValidationIT.java
index af8fc10..b65f6bf 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ValidationIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ValidationIT.java
@@ -37,12 +37,7 @@ import
software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
-import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
-import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
-import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
-import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
-import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import java.sql.DriverManager;