This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-20543-2 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8e84bc3ab5410879ed581cae910af9cd909bad0a Author: Andrea Cosentino <[email protected]> AuthorDate: Tue Mar 12 09:31:33 2024 +0100 CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the producer side Signed-off-by: Andrea Cosentino <[email protected]> --- .../aws2/bedrock/agent/BedrockAgentConstants.java | 4 ++ .../aws2/bedrock/agent/BedrockAgentOperations.java | 4 +- .../aws2/bedrock/agent/BedrockAgentProducer.java | 69 ++++++++++++++++++++-- .../agent/integration/BedrockAgentProducerIT.java | 1 - 4 files changed, 72 insertions(+), 6 deletions(-) diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java index fb8ed36ba53..2461c482e10 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java @@ -40,4 +40,8 @@ public interface BedrockAgentConstants { @Metadata(description = "The header contains the status of the ingestion job", javaType = "String", label = "consumer") String INGESTION_JOB_STATUS = "CamelAwsBedrockAgentIngestionJobStatus"; + + @Metadata(description = "The header contains the id of the ingestion job", + javaType = "String", label = "producer") + String INGESTION_JOB_ID = "CamelAwsBedrockAgentIngestionJobId"; } diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java index 57710e6951e..4fb08645cfe 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java @@ -20,5 +20,7 @@ public enum BedrockAgentOperations { startIngestionJob, - listIngestionJobs + listIngestionJobs, + + getIngestionJob } diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java index 4ba7a381003..d74b9593258 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java @@ -27,10 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.bedrockagent.BedrockAgentClient; -import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsRequest; -import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsResponse; -import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobRequest; -import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobResponse; +import software.amazon.awssdk.services.bedrockagent.model.*; /** * A Producer which sends messages to the Amazon Bedrock Agent Service <a href="http://aws.amazon.com/bedrock/">AWS @@ -54,6 +51,9 @@ public class BedrockAgentProducer extends DefaultProducer { case listIngestionJobs: listIngestionJobs(getEndpoint().getBedrockAgentClient(), exchange); break; + case getIngestionJob: + getIngestionJob(getEndpoint().getBedrockAgentClient(), exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -176,6 +176,62 @@ public class BedrockAgentProducer extends DefaultProducer { } } + private void getIngestionJob(BedrockAgentClient bedrockAgentClient, Exchange exchange) + throws InvalidPayloadException { + if (getConfiguration().isPojoRequest()) { + Object payload = exchange.getMessage().getMandatoryBody(); + if (payload instanceof GetIngestionJobRequest) { + GetIngestionJobResponse result; + try { + result = bedrockAgentClient.getIngestionJob((GetIngestionJobRequest) payload); + } catch (AwsServiceException ase) { + LOG.trace("Get Ingestion Job command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + prepareGetIngestionJobResponse(result, message); + } + } else { + String knowledgeBaseId; + String dataSourceId; + String ingestionJobId; + GetIngestionJobRequest.Builder builder = GetIngestionJobRequest.builder(); + if (ObjectHelper.isEmpty(getConfiguration().getKnowledgeBaseId())) { + if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID))) { + knowledgeBaseId = exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, String.class); + } else { + throw new IllegalArgumentException("KnowledgeBaseId must be specified"); + } + } else { + knowledgeBaseId = getConfiguration().getKnowledgeBaseId(); + } + if (ObjectHelper.isEmpty(getConfiguration().getDataSourceId())) { + if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID))) { + dataSourceId = exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID, String.class); + } else { + throw new IllegalArgumentException("DataSourceId must be specified"); + } + } else { + dataSourceId = getConfiguration().getDataSourceId(); + } + if (ObjectHelper.isEmpty(getConfiguration().getIngestionJobId())) { + if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.INGESTION_JOB_ID))) { + ingestionJobId = exchange.getMessage().getHeader(BedrockAgentConstants.INGESTION_JOB_ID, String.class); + } else { + throw new IllegalArgumentException("IngestionJobId must be specified"); + } + } else { + ingestionJobId = getConfiguration().getIngestionJobId(); + } + builder.knowledgeBaseId(knowledgeBaseId); + builder.dataSourceId(dataSourceId); + builder.ingestionJobId(ingestionJobId); + GetIngestionJobResponse output = bedrockAgentClient.getIngestionJob(builder.build()); + Message message = getMessageForResponse(exchange); + prepareGetIngestionJobResponse(output, message); + } + } + private void prepareIngestionJobResponse(StartIngestionJobResponse result, Message message) { message.setBody(result.ingestionJob().ingestionJobId()); } @@ -186,6 +242,11 @@ public class BedrockAgentProducer extends DefaultProducer { } } + private void prepareGetIngestionJobResponse(GetIngestionJobResponse result, Message message) { + message.setBody(result.ingestionJob().ingestionJobId()); + message.setHeader(BedrockAgentConstants.INGESTION_JOB_STATUS, result.ingestionJob().status()); + } + public static Message getMessageForResponse(final Exchange exchange) { return exchange.getMessage(); } diff --git a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java index 2de238a7fb4..8d04c59668b 100644 --- a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java +++ b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java @@ -76,7 +76,6 @@ class BedrockAgentProducerIT extends CamelTestSupport { .to(result); from("direct:list_ingestion_jobs") .to("aws-bedrock-agent:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}}®ion=us-east-1&operation=listIngestionJobs") - .log("${body}") .to(result); } };
