This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-20543-2
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8e84bc3ab5410879ed581cae910af9cd909bad0a
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Mar 12 09:31:33 2024 +0100

    CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the 
producer side
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../aws2/bedrock/agent/BedrockAgentConstants.java  |  4 ++
 .../aws2/bedrock/agent/BedrockAgentOperations.java |  4 +-
 .../aws2/bedrock/agent/BedrockAgentProducer.java   | 69 ++++++++++++++++++++--
 .../agent/integration/BedrockAgentProducerIT.java  |  1 -
 4 files changed, 72 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java
 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java
index fb8ed36ba53..2461c482e10 100644
--- 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java
+++ 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentConstants.java
@@ -40,4 +40,8 @@ public interface BedrockAgentConstants {
     @Metadata(description = "The header contains the status of the ingestion 
job",
               javaType = "String", label = "consumer")
     String INGESTION_JOB_STATUS = "CamelAwsBedrockAgentIngestionJobStatus";
+
+    @Metadata(description = "The header contains the id of the ingestion job",
+            javaType = "String", label = "producer")
+    String INGESTION_JOB_ID = "CamelAwsBedrockAgentIngestionJobId";
 }
diff --git 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
index 57710e6951e..4fb08645cfe 100644
--- 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
+++ 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
@@ -20,5 +20,7 @@ public enum BedrockAgentOperations {
 
     startIngestionJob,
 
-    listIngestionJobs
+    listIngestionJobs,
+
+    getIngestionJob
 }
diff --git 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
index 4ba7a381003..d74b9593258 100644
--- 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
+++ 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
@@ -27,10 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.services.bedrockagent.BedrockAgentClient;
-import 
software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsRequest;
-import 
software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsResponse;
-import 
software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobRequest;
-import 
software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobResponse;
+import software.amazon.awssdk.services.bedrockagent.model.*;
 
 /**
  * A Producer which sends messages to the Amazon Bedrock Agent Service <a 
href="http://aws.amazon.com/bedrock/";>AWS
@@ -54,6 +51,9 @@ public class BedrockAgentProducer extends DefaultProducer {
             case listIngestionJobs:
                 listIngestionJobs(getEndpoint().getBedrockAgentClient(), 
exchange);
                 break;
+            case getIngestionJob:
+                getIngestionJob(getEndpoint().getBedrockAgentClient(), 
exchange);
+                break;
             default:
                 throw new IllegalArgumentException("Unsupported operation");
         }
@@ -176,6 +176,62 @@ public class BedrockAgentProducer extends DefaultProducer {
         }
     }
 
+    private void getIngestionJob(BedrockAgentClient bedrockAgentClient, 
Exchange exchange)
+            throws InvalidPayloadException {
+        if (getConfiguration().isPojoRequest()) {
+            Object payload = exchange.getMessage().getMandatoryBody();
+            if (payload instanceof GetIngestionJobRequest) {
+                GetIngestionJobResponse result;
+                try {
+                    result = 
bedrockAgentClient.getIngestionJob((GetIngestionJobRequest) payload);
+                } catch (AwsServiceException ase) {
+                    LOG.trace("Get Ingestion Job command returned the error 
code {}", ase.awsErrorDetails().errorCode());
+                    throw ase;
+                }
+                Message message = getMessageForResponse(exchange);
+                prepareGetIngestionJobResponse(result, message);
+            }
+        } else {
+            String knowledgeBaseId;
+            String dataSourceId;
+            String ingestionJobId;
+            GetIngestionJobRequest.Builder builder = 
GetIngestionJobRequest.builder();
+            if (ObjectHelper.isEmpty(getConfiguration().getKnowledgeBaseId())) 
{
+                if 
(ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID)))
 {
+                    knowledgeBaseId = 
exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, 
String.class);
+                } else {
+                    throw new IllegalArgumentException("KnowledgeBaseId must 
be specified");
+                }
+            } else {
+                knowledgeBaseId = getConfiguration().getKnowledgeBaseId();
+            }
+            if (ObjectHelper.isEmpty(getConfiguration().getDataSourceId())) {
+                if 
(ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID)))
 {
+                    dataSourceId = 
exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID, 
String.class);
+                } else {
+                    throw new IllegalArgumentException("DataSourceId must be 
specified");
+                }
+            } else {
+                dataSourceId = getConfiguration().getDataSourceId();
+            }
+            if (ObjectHelper.isEmpty(getConfiguration().getIngestionJobId())) {
+                if 
(ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.INGESTION_JOB_ID)))
 {
+                    ingestionJobId = 
exchange.getMessage().getHeader(BedrockAgentConstants.INGESTION_JOB_ID, 
String.class);
+                } else {
+                    throw new IllegalArgumentException("IngestionJobId must be 
specified");
+                }
+            } else {
+                ingestionJobId = getConfiguration().getIngestionJobId();
+            }
+            builder.knowledgeBaseId(knowledgeBaseId);
+            builder.dataSourceId(dataSourceId);
+            builder.ingestionJobId(ingestionJobId);
+            GetIngestionJobResponse output = 
bedrockAgentClient.getIngestionJob(builder.build());
+            Message message = getMessageForResponse(exchange);
+            prepareGetIngestionJobResponse(output, message);
+        }
+    }
+
     private void prepareIngestionJobResponse(StartIngestionJobResponse result, 
Message message) {
         message.setBody(result.ingestionJob().ingestionJobId());
     }
@@ -186,6 +242,11 @@ public class BedrockAgentProducer extends DefaultProducer {
         }
     }
 
+    private void prepareGetIngestionJobResponse(GetIngestionJobResponse 
result, Message message) {
+            message.setBody(result.ingestionJob().ingestionJobId());
+            message.setHeader(BedrockAgentConstants.INGESTION_JOB_STATUS, 
result.ingestionJob().status());
+    }
+
     public static Message getMessageForResponse(final Exchange exchange) {
         return exchange.getMessage();
     }
diff --git 
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
 
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
index 2de238a7fb4..8d04c59668b 100644
--- 
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
+++ 
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
@@ -76,7 +76,6 @@ class BedrockAgentProducerIT extends CamelTestSupport {
                         .to(result);
                 from("direct:list_ingestion_jobs")
                         
.to("aws-bedrock-agent:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}}&region=us-east-1&operation=listIngestionJobs")
-                        .log("${body}")
                         .to(result);
             }
         };

Reply via email to