This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-20417 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4c01bb5e7728b49cea6ec5d125720d40e2849855 Author: Andrea Cosentino <[email protected]> AuthorDate: Tue Feb 20 12:07:48 2024 +0100 CAMEL-20417 - AWS DDBStreams CloudEvent Transformer Signed-off-by: Andrea Cosentino <[email protected]> --- .../apache/camel/catalog/transformers.properties | 1 + .../aws2-ddbstream-application-cloudevents.json | 14 ++++++ components/camel-aws/camel-aws2-ddb/pom.xml | 8 ++++ .../org/apache/camel/transformer.properties | 2 +- .../aws2-ddbstream-application-cloudevents | 2 + .../aws2-ddbstream-application-cloudevents.json | 14 ++++++ .../aws2/ddbstream/Ddb2StreamConstants.java | 33 +++++++++++++ .../aws2/ddbstream/Ddb2StreamConsumer.java | 4 +- .../Ddb2StreamCloudEventDataTypeTransformer.java | 56 ++++++++++++++++++++++ .../org/apache/camel/spring/xml/errorHandler.json | 4 +- 10 files changed, 135 insertions(+), 3 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties index 67d1f0b7960..251aeaf2a31 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties @@ -8,6 +8,7 @@ avro-x-java-object avro-x-struct aws-cloudtrail-application-cloudevents aws2-ddb-application-json +aws2-ddbstream-application-cloudevents aws2-kinesis-application-cloudevents aws2-s3-application-cloudevents aws2-sqs-application-cloudevents diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddbstream-application-cloudevents.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddbstream-application-cloudevents.json new file mode 100644 index 00000000000..1b82dc7275d --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddbstream-application-cloudevents.json @@ -0,0 +1,14 @@ +{ + "transformer": { + "kind": "transformer", + "name": "aws2-ddbstream:application-cloudevents", + "title": "Aws2 Ddbstream (Application Cloudevents)", + "description": "Adds CloudEvent headers to the Camel message with AWS Dynamo DB Streams get records response details", + "deprecated": false, + "javaType": "org.apache.camel.component.aws2.ddbstream.transform.Ddb2StreamCloudEventDataTypeTransformer", + "groupId": "org.apache.camel", + "artifactId": "camel-aws2-ddb", + "version": "4.5.0-SNAPSHOT" + } +} + diff --git a/components/camel-aws/camel-aws2-ddb/pom.xml b/components/camel-aws/camel-aws2-ddb/pom.xml index 07087268e8c..b844fc76112 100644 --- a/components/camel-aws/camel-aws2-ddb/pom.xml +++ b/components/camel-aws/camel-aws2-ddb/pom.xml @@ -63,6 +63,14 @@ <optional>true</optional> </dependency> + <!-- optional CloudEvent support --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cloudevents</artifactId> + <scope>provided</scope> + <optional>true</optional> + </dependency> + <!-- for testing --> <dependency> <groupId>org.apache.camel</groupId> diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties index 6b39b49a85c..558eff168c7 100644 --- a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties +++ b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties @@ -1,5 +1,5 @@ # Generated by camel build tools - do NOT edit this file! -transformers=aws2-ddb:application-json +transformers=aws2-ddb:application-json aws2-ddbstream:application-cloudevents groupId=org.apache.camel artifactId=camel-aws2-ddb version=4.5.0-SNAPSHOT diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddbstream-application-cloudevents b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddbstream-application-cloudevents new file mode 100644 index 00000000000..38e6ec7333c --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddbstream-application-cloudevents @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.aws2.ddbstream.transform.Ddb2StreamCloudEventDataTypeTransformer diff --git a/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddbstream-application-cloudevents.json b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddbstream-application-cloudevents.json new file mode 100644 index 00000000000..1b82dc7275d --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddbstream-application-cloudevents.json @@ -0,0 +1,14 @@ +{ + "transformer": { + "kind": "transformer", + "name": "aws2-ddbstream:application-cloudevents", + "title": "Aws2 Ddbstream (Application Cloudevents)", + "description": "Adds CloudEvent headers to the Camel message with AWS Dynamo DB Streams get records response details", + "deprecated": false, + "javaType": "org.apache.camel.component.aws2.ddbstream.transform.Ddb2StreamCloudEventDataTypeTransformer", + "groupId": "org.apache.camel", + "artifactId": "camel-aws2-ddb", + "version": "4.5.0-SNAPSHOT" + } +} + diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConstants.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConstants.java new file mode 100644 index 00000000000..74dd4a68202 --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConstants.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws2.ddbstream; + +import org.apache.camel.spi.Metadata; + +/** + * Constants used in Camel AWS DynamoDB Streams component + */ +public interface Ddb2StreamConstants { + @Metadata(label = "consumer", + description = "The Amazon Web Services service from which the stream record originated. For DynamoDB Streams, this is aws:dynamodb.", + javaType = "String") + String EVENT_SOURCE = "CamelAwsDdbStreamEventSource"; + @Metadata(label = "consumer", + description = "A globally unique identifier for the event that was recorded in this stream record.", + javaType = "String") + String EVENT_ID = "CamelAwsDdbStreamEventId"; +} diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java index 9e3aa959d0a..19c0e184af8 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java @@ -126,7 +126,9 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { protected Exchange createExchange(Record record) { Exchange ex = createExchange(true); - ex.getIn().setBody(record, Record.class); + ex.getMessage().setBody(record, Record.class); + ex.getMessage().setHeader(Ddb2StreamConstants.EVENT_SOURCE, record.eventSource()); + ex.getMessage().setHeader(Ddb2StreamConstants.EVENT_ID, record.eventID()); return ex; } diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/transform/Ddb2StreamCloudEventDataTypeTransformer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/transform/Ddb2StreamCloudEventDataTypeTransformer.java new file mode 100644 index 00000000000..4952822e0f8 --- /dev/null +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/transform/Ddb2StreamCloudEventDataTypeTransformer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.aws2.ddbstream.transform; + +import java.util.Map; + +import org.apache.camel.Message; +import org.apache.camel.component.aws2.ddbstream.Ddb2StreamConstants; +import org.apache.camel.component.cloudevents.CloudEvent; +import org.apache.camel.component.cloudevents.CloudEvents; +import org.apache.camel.spi.DataType; +import org.apache.camel.spi.DataTypeTransformer; +import org.apache.camel.spi.Transformer; + +/** + * Data type transformer converts AWS Dynamo DB Streams get records response to CloudEvent v1_0 data format. The data + * type sets Camel specific CloudEvent headers with values extracted from AWS Dynamo DB Streams get records. + */ +@DataTypeTransformer(name = "aws2-ddbstream:application-cloudevents", + description = "Adds CloudEvent headers to the Camel message with AWS Dynamo DB Streams get records response details") +public class Ddb2StreamCloudEventDataTypeTransformer extends Transformer { + + @Override + public void transform(Message message, DataType fromType, DataType toType) { + final Map<String, Object> headers = message.getHeaders(); + + CloudEvent cloudEvent = CloudEvents.v1_0; + headers.putIfAbsent(CloudEvent.CAMEL_CLOUD_EVENT_ID, message.getExchange().getExchangeId()); + headers.putIfAbsent(CloudEvent.CAMEL_CLOUD_EVENT_VERSION, cloudEvent.version()); + headers.put(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, "org.apache.camel.event.aws.ddbstream.getRecords"); + + if (message.getHeaders().containsKey(Ddb2StreamConstants.EVENT_SOURCE)) { + headers.put(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, + "aws.s3.ddbstream." + message.getHeader(Ddb2StreamConstants.EVENT_SOURCE, String.class)); + } + + headers.put(CloudEvent.CAMEL_CLOUD_EVENT_SUBJECT, message.getHeader(Ddb2StreamConstants.EVENT_ID, String.class)); + headers.put(CloudEvent.CAMEL_CLOUD_EVENT_TIME, cloudEvent.getEventTime(message.getExchange())); + headers.put(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, CloudEvent.APPLICATION_OCTET_STREAM_MIME_TYPE); + } +} diff --git a/components/camel-spring-xml/src/generated/resources/org/apache/camel/spring/xml/errorHandler.json b/components/camel-spring-xml/src/generated/resources/org/apache/camel/spring/xml/errorHandler.json index 3cc70a7186e..c6ea3ea7fc2 100644 --- a/components/camel-spring-xml/src/generated/resources/org/apache/camel/spring/xml/errorHandler.json +++ b/components/camel-spring-xml/src/generated/resources/org/apache/camel/spring/xml/errorHandler.json @@ -32,6 +32,8 @@ "redeliveryPolicy": { "index": 17, "kind": "element", "displayName": "Redelivery Policy", "required": false, "type": "object", "javaType": "org.apache.camel.spring.xml.CamelRedeliveryPolicyFactoryBean", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the redelivery settings" } }, "exchangeProperties": { - "CamelExceptionCaught": { "index": 0, "kind": "exchangeProperty", "displayName": "Exception Caught", "required": false, "javaType": "java.lang.Exception", "deprecated": false, "autowired": false, "secret": false, "description": "Stores the caught exception due to a processing error of the current Exchange" } + "CamelExceptionCaught": { "index": 0, "kind": "exchangeProperty", "displayName": "Exception Caught", "label": "producer", "required": false, "javaType": "java.lang.Exception", "deprecated": false, "autowired": false, "secret": false, "description": "Stores the caught exception due to a processing error of the current Exchange" }, + "CamelFailureEndpoint": { "index": 1, "kind": "exchangeProperty", "displayName": "Failure Endpoint", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "autowired": false, "secret": false, "description": "Endpoint URI where the Exchange failed during processing" }, + "CamelFailureRouteId": { "index": 2, "kind": "exchangeProperty", "displayName": "Failure Route Id", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "autowired": false, "secret": false, "description": "Route ID where the Exchange failed during processing" } } }
