This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new b52830b560f CAMEL-20794: AWS2 Kinesis producer supports sending batch
(#14618)
b52830b560f is described below
commit b52830b560fa64e7c9188f4105837f87d6aaa3f4
Author: Fan Yang <[email protected]>
AuthorDate: Mon Jun 24 17:17:56 2024 +0800
CAMEL-20794: AWS2 Kinesis producer supports sending batch (#14618)
* Kinesis producer supports sending batch
* Format code
* Set max batch size 500
---
.../component/aws2/kinesis/Kinesis2Producer.java | 96 +++++++++++++++++++---
.../kinesis/integration/KinesisProducerIT.java | 13 ++-
2 files changed, 97 insertions(+), 12 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index ce8903aa4ac..c51fe777379 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -16,6 +16,11 @@
*/
package org.apache.camel.component.aws2.kinesis;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultProducer;
@@ -23,9 +28,15 @@ import org.apache.camel.util.ObjectHelper;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
public class Kinesis2Producer extends DefaultProducer {
+ // Maximum number of records that can be sent in a single PutRecords
request
+ private static final int MAX_BATCH_SIZE = 500;
+
private KinesisConnection connection;
public Kinesis2Producer(Kinesis2Endpoint endpoint) {
@@ -45,11 +56,80 @@ public class Kinesis2Producer extends DefaultProducer {
return (Kinesis2Endpoint) super.getEndpoint();
}
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ ObjectHelper.notNull(connection, "connection", this);
+ }
+
@Override
public void process(Exchange exchange) throws Exception {
+ Object body = exchange.getIn().getBody();
+ if (body instanceof Iterable) {
+ sendBatchRecords(exchange);
+ } else {
+ sendSingleRecord(exchange);
+ }
+ }
+
+ private void sendBatchRecords(Exchange exchange) {
+ Object partitionKey =
exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY);
+ ensurePartitionKeyNotNull(partitionKey);
+ List<List<PutRecordsRequestEntry>> requestBatchList =
createRequestBatchList(exchange, partitionKey);
+ for (List<PutRecordsRequestEntry> requestBatch : requestBatchList) {
+ PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
+
.streamName(getEndpoint().getConfiguration().getStreamName())
+ .records(requestBatch)
+ .build();
+ PutRecordsResponse putRecordsResponse =
connection.getClient(getEndpoint()).putRecords(putRecordsRequest);
+ if (putRecordsResponse.failedRecordCount() > 0) {
+ throw new RuntimeException(
+ "Failed to send records " +
putRecordsResponse.failedRecordCount() + " of "
+ +
putRecordsResponse.records().size());
+ }
+ }
+ }
+
+ private List<List<PutRecordsRequestEntry>> createRequestBatchList(Exchange
exchange, Object partitionKey) {
+ List<List<PutRecordsRequestEntry>> requestBatchList = new
ArrayList<>();
+ List<PutRecordsRequestEntry> requestBatch = new
ArrayList<>(MAX_BATCH_SIZE);
+ for (Object record : exchange.getIn().getBody(Iterable.class)) {
+ SdkBytes sdkBytes;
+ if (record instanceof byte[] bytes) {
+ sdkBytes = SdkBytes.fromByteArray(bytes);
+ } else if (record instanceof ByteBuffer bf) {
+ sdkBytes = SdkBytes.fromByteBuffer(bf);
+ } else if (record instanceof InputStream is) {
+ sdkBytes = SdkBytes.fromInputStream(is);
+ } else if (record instanceof String str) {
+ sdkBytes = SdkBytes.fromUtf8String(str);
+ } else {
+ throw new IllegalArgumentException(
+ "Record type not supported. Must be byte[],
ByteBuffer, InputStream or UTF-8 String");
+ }
+
+ PutRecordsRequestEntry putRecordsRequestEntry =
PutRecordsRequestEntry.builder()
+ .data(sdkBytes)
+ .partitionKey(partitionKey.toString())
+ .build();
+ requestBatch.add(putRecordsRequestEntry);
+ if (requestBatch.size() == MAX_BATCH_SIZE) {
+ requestBatchList.add(requestBatch);
+ requestBatch = new ArrayList<>(MAX_BATCH_SIZE);
+ }
+ }
+ if (!requestBatch.isEmpty()) {
+ requestBatchList.add(requestBatch);
+ }
+
+ return requestBatchList;
+ }
+
+ private void sendSingleRecord(Exchange exchange) {
PutRecordRequest request = createRequest(exchange);
PutRecordResponse putRecordResult =
connection.getClient(getEndpoint()).putRecord(request);
- Message message = getMessageForResponse(exchange);
+ Message message = exchange.getMessage();
message.setHeader(Kinesis2Constants.SEQUENCE_NUMBER,
putRecordResult.sequenceNumber());
message.setHeader(Kinesis2Constants.SHARD_ID,
putRecordResult.shardId());
}
@@ -62,6 +142,7 @@ public class Kinesis2Producer extends DefaultProducer {
PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder();
putRecordRequest.data(SdkBytes.fromByteArray(body));
putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName());
+ ensurePartitionKeyNotNull(partitionKey);
putRecordRequest.partitionKey(partitionKey.toString());
if (sequenceNumber != null) {
putRecordRequest.sequenceNumberForOrdering(sequenceNumber.toString());
@@ -69,14 +150,9 @@ public class Kinesis2Producer extends DefaultProducer {
return putRecordRequest.build();
}
- public static Message getMessageForResponse(final Exchange exchange) {
- return exchange.getMessage();
- }
-
- @Override
- protected void doStart() throws Exception {
- super.doStart();
-
- ObjectHelper.notNull(connection, "connection", this);
+ private void ensurePartitionKeyNotNull(Object partitionKey) {
+ if (partitionKey == null) {
+ throw new IllegalArgumentException("Partition key must be
specified");
+ }
}
}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
index 4b43ff03f40..8b303017571 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.aws2.kinesis.integration;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -105,14 +106,22 @@ public class KinesisProducerIT extends CamelTestSupport {
exchange.getIn().setBody("Kinesis Event 2.");
});
- List<Record> records;
+ template.send("direct:start", ExchangePattern.InOut, exchange -> {
+ exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY,
"partition-1");
+ exchange.getIn().setBody(Arrays.asList("Kinesis Event 3.",
"Kinesis Event 4.".getBytes(StandardCharsets.UTF_8)));
+ });
+
Awaitility.await().atMost(5, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(2, consumeMessages()));
+ .untilAsserted(() -> assertEquals(4, consumeMessages()));
assertEquals("Kinesis Event 1.",
recordList.get(0).data().asString(StandardCharsets.UTF_8));
assertEquals("partition-1", recordList.get(0).partitionKey());
assertEquals("Kinesis Event 2.",
recordList.get(1).data().asString(StandardCharsets.UTF_8));
assertEquals("partition-1", recordList.get(1).partitionKey());
+ assertEquals("Kinesis Event 3.",
recordList.get(2).data().asString(StandardCharsets.UTF_8));
+ assertEquals("partition-1", recordList.get(2).partitionKey());
+ assertEquals("Kinesis Event 4.",
recordList.get(3).data().asString(StandardCharsets.UTF_8));
+ assertEquals("partition-1", recordList.get(3).partitionKey());
}
@Override