This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0e42ce4447baed541698f5754fb5459749eee6cc Author: Andrea Cosentino <[email protected]> AuthorDate: Fri Jul 22 10:53:19 2022 +0200 CAMEL-18287 - Camel-AWS Eventbridge: Support operation putEvents to send custom event to Eventbridge --- .../aws2/eventbridge/EventbridgeConstants.java | 6 ++ .../aws2/eventbridge/EventbridgeOperations.java | 3 +- .../aws2/eventbridge/EventbridgeProducer.java | 79 ++++++++++++++++------ 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java index 028dca7a858..f7db6911567 100644 --- a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java +++ b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java @@ -36,4 +36,10 @@ public interface EventbridgeConstants { String TARGETS_IDS = "CamelAwsEventbridgeTargetsIds"; @Metadata(description = "The Amazon Resource Name (ARN) of the target resource.", javaType = "String") String TARGET_ARN = "CamelAwsEventbridgeTargetArn"; + @Metadata(description = "Comma separated list of Amazon Resource Names (ARN) of the resources related to Event", javaType = "String") + String EVENT_RESOURCES_ARN = "CamelAwsEventbridgeResourcesArn"; + @Metadata(description = "The source related to Event", javaType = "String") + String EVENT_SOURCE = "CamelAwsEventbridgeSource"; + @Metadata(description = "The detail type related to Event", javaType = "String") + String EVENT_DETAIL_TYPE = "CamelAwsEventbridgeDetailType"; } diff --git a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeOperations.java b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeOperations.java index 0b42b9f1de6..87a76ff862d 100644 --- a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeOperations.java +++ b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeOperations.java @@ -27,5 +27,6 @@ public enum EventbridgeOperations { describeRule, listRules, listTargetsByRule, - listRuleNamesByTarget + listRuleNamesByTarget, + putEvent } diff --git a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java index cd71ae4b529..7e7c67da1be 100644 --- a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java +++ b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -34,27 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.eventbridge.EventBridgeClient; -import software.amazon.awssdk.services.eventbridge.model.DeleteRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.DeleteRuleResponse; -import software.amazon.awssdk.services.eventbridge.model.DescribeRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.DescribeRuleResponse; -import software.amazon.awssdk.services.eventbridge.model.DisableRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.DisableRuleResponse; -import software.amazon.awssdk.services.eventbridge.model.EnableRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.EnableRuleResponse; -import software.amazon.awssdk.services.eventbridge.model.ListRuleNamesByTargetRequest; -import software.amazon.awssdk.services.eventbridge.model.ListRuleNamesByTargetResponse; -import software.amazon.awssdk.services.eventbridge.model.ListRulesRequest; -import software.amazon.awssdk.services.eventbridge.model.ListRulesResponse; -import software.amazon.awssdk.services.eventbridge.model.ListTargetsByRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.ListTargetsByRuleResponse; -import software.amazon.awssdk.services.eventbridge.model.PutRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.PutRuleResponse; -import software.amazon.awssdk.services.eventbridge.model.PutTargetsRequest; -import software.amazon.awssdk.services.eventbridge.model.PutTargetsResponse; -import software.amazon.awssdk.services.eventbridge.model.RemoveTargetsRequest; -import software.amazon.awssdk.services.eventbridge.model.RemoveTargetsResponse; -import software.amazon.awssdk.services.eventbridge.model.Target; +import software.amazon.awssdk.services.eventbridge.model.*; /** * A Producer which sends messages to the Amazon Eventbridge Service SDK v2 @@ -103,6 +86,9 @@ public class EventbridgeProducer extends DefaultProducer { case listRuleNamesByTarget: listRuleNamesByTarget(getEndpoint().getEventbridgeClient(), exchange); break; + case putEvent: + putEvent(getEndpoint().getEventbridgeClient(), exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -482,6 +468,57 @@ public class EventbridgeProducer extends DefaultProducer { } } + private void putEvent(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException, IOException { + if (getConfiguration().isPojoRequest()) { + Object payload = exchange.getIn().getMandatoryBody(); + if (payload instanceof PutEventsRequest) { + PutEventsResponse result; + try { + result = eventbridgeClient.putEvents((PutEventsRequest) payload); + } catch (AwsServiceException ase) { + LOG.trace("PutEvents command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + } + } else { + PutEventsRequest.Builder builder = PutEventsRequest.builder(); + PutEventsRequestEntry.Builder entryBuilder = PutEventsRequestEntry.builder(); + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.EVENT_RESOURCES_ARN))) { + String resourcesArn = exchange.getIn().getHeader(EventbridgeConstants.EVENT_RESOURCES_ARN, String.class); + entryBuilder.resources(Stream.of(resourcesArn.split(",")).collect(Collectors.toList())); + } else { + throw new IllegalArgumentException("At least one resource ARN must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.EVENT_DETAIL_TYPE))) { + String detailType = exchange.getIn().getHeader(EventbridgeConstants.EVENT_DETAIL_TYPE, String.class); + entryBuilder.detailType(detailType); + } else { + throw new IllegalArgumentException("Detail Type must be specified"); + } + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.EVENT_SOURCE))) { + String source = exchange.getIn().getHeader(EventbridgeConstants.EVENT_SOURCE, String.class); + entryBuilder.source(source); + } else { + throw new IllegalArgumentException("Source must be specified"); + } + entryBuilder.eventBusName(getConfiguration().getEventbusName()); + entryBuilder.detail(exchange.getMessage().getMandatoryBody(String.class)); + + builder.entries(entryBuilder.build()); + PutEventsResponse result; + try { + result = eventbridgeClient.putEvents(builder.build()); + } catch (AwsServiceException ase) { + LOG.trace("Put Events command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + } + } + @Override public EventbridgeEndpoint getEndpoint() { return (EventbridgeEndpoint) super.getEndpoint();
