This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e623033f3 Increase azure-eventhubs test coverage
7e623033f3 is described below

commit 7e623033f3a582a1f49251df1f0fe50cb40798dc
Author: James Netherton <[email protected]>
AuthorDate: Thu Aug 22 07:12:58 2024 +0100

    Increase azure-eventhubs test coverage
    
    Fixes #6367
---
 .../azure/azure-eventhubs/pom.xml                  |  25 +-
 .../azure/eventhubs/it/AzureCredentialsHelper.java |  88 +++++
 .../eventhubs/it/AzureEventhubsProducers.java      |  57 +++
 .../azure/eventhubs/it/AzureEventhubsResource.java | 120 ++++--
 .../azure/eventhubs/it/AzureEventhubsRoutes.java   | 192 +++++++++-
 .../eventhubs/it/InMemoryCheckpointStore.java      | 134 +++++++
 .../src/main/resources/application.properties      |  22 ++
 .../azure/eventhubs/it/AzureEventhubsIT.java       |   2 +-
 .../azure/eventhubs/it/AzureEventhubsTest.java     | 405 ++++++++++++++++++++-
 integration-test-groups/azure/azure-resources.sh   |   2 +-
 10 files changed, 976 insertions(+), 71 deletions(-)

diff --git a/integration-test-groups/azure/azure-eventhubs/pom.xml 
b/integration-test-groups/azure/azure-eventhubs/pom.xml
index 35015cc397..099adb9a07 100644
--- a/integration-test-groups/azure/azure-eventhubs/pom.xml
+++ b/integration-test-groups/azure/azure-eventhubs/pom.xml
@@ -33,15 +33,15 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
-            <artifactId>camel-quarkus-azure-eventhubs</artifactId>
+            <artifactId>camel-quarkus-direct</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
-            <artifactId>camel-quarkus-mock</artifactId>
+            <artifactId>camel-quarkus-azure-eventhubs</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.quarkus</groupId>
-            <artifactId>quarkus-quartz</artifactId>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-mock</artifactId>
         </dependency>
         <dependency>
             <groupId>io.quarkus</groupId>
@@ -51,6 +51,10 @@
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-resteasy-jsonb</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-integration-test-support</artifactId>
+        </dependency>
 
         <!-- test dependencies -->
         <dependency>
@@ -107,6 +111,19 @@
             </activation>
             <dependencies>
                 <!-- The following dependencies guarantee that this module is 
built after them. You can update them by running `mvn process-resources 
-Pformat -N` from the source tree root directory -->
+                <dependency>
+                    <groupId>org.apache.camel.quarkus</groupId>
+                    <artifactId>camel-quarkus-direct-deployment</artifactId>
+                    <version>${project.version}</version>
+                    <type>pom</type>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>*</groupId>
+                            <artifactId>*</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.camel.quarkus</groupId>
                     
<artifactId>camel-quarkus-azure-eventhubs-deployment</artifactId>
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureCredentialsHelper.java
 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureCredentialsHelper.java
new file mode 100644
index 0000000000..df9687c78a
--- /dev/null
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureCredentialsHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.azure.eventhubs.it;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.azure.core.amqp.implementation.ConnectionStringProperties;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public final class AzureCredentialsHelper {
+    private AzureCredentialsHelper() {
+        // Utility class
+    }
+
+    public static boolean isMinimumConfigurationAvailable() {
+        Config config = ConfigProvider.getConfig();
+        if (isMockBackEnd()) {
+            return false;
+        }
+        Optional<String> storageAccountName = 
config.getOptionalValue("azure.storage.account-name", String.class);
+        Optional<String> storageAccountKey = 
config.getOptionalValue("azure.storage.account-key", String.class);
+        Optional<String> connectionString = 
config.getOptionalValue("azure.event.hubs.connection.string", String.class);
+        return storageAccountName.isPresent() && storageAccountKey.isPresent() 
&& connectionString.isPresent();
+    }
+
+    public static boolean isAzureIdentityCredentialsAvailable() {
+        Config config = ConfigProvider.getConfig();
+        if (isMockBackEnd()) {
+            return false;
+        }
+
+        Optional<String> clientId = config.getOptionalValue("azure.client.id", 
String.class);
+        Optional<String> tenantId = config.getOptionalValue("azure.tenant.id", 
String.class);
+        Optional<String> username = config.getOptionalValue("azure.username", 
String.class);
+        Optional<String> password = config.getOptionalValue("azure.password", 
String.class);
+        Optional<String> clientSecret = 
config.getOptionalValue("azure.client.secret", String.class);
+        Optional<String> clientCertificate = 
config.getOptionalValue("azure.client.certificate.path", String.class);
+        Optional<String> clientCertificatePassword = 
config.getOptionalValue("azure.client.certificate.password", String.class);
+        return (clientId.isPresent() && tenantId.isPresent() &&
+                (username.isPresent() || password.isPresent() || 
clientCertificate.isPresent() || clientSecret.isPresent()
+                        || clientCertificatePassword.isPresent()));
+    }
+
+    public static boolean isSharedAccessKeyAvailable() {
+        Config config = ConfigProvider.getConfig();
+        if (isMockBackEnd()) {
+            return false;
+        }
+        return config.getOptionalValue("azure.event.hubs.shared.access.name", 
String.class).isPresent()
+                && 
config.getOptionalValue("azure.event.hubs.shared.access.key", 
String.class).isPresent();
+    }
+
+    public static boolean isMockBackEnd() {
+        Config config = ConfigProvider.getConfig();
+        return config.getOptionalValue("camel.quarkus.start.mock.backend", 
Boolean.class).orElse(true);
+    }
+
+    public static Map<String, String> parseConnectionString(String 
connectionString) {
+        Map<String, String> properties = new HashMap<>();
+        ConnectionStringProperties stringProperties = new 
ConnectionStringProperties(connectionString);
+        properties.put("Endpoint", stringProperties.getEndpoint().toString());
+        properties.put("EntityPath", stringProperties.getEntityPath());
+        properties.put("SharedAccessKey", 
stringProperties.getSharedAccessKeyName());
+        properties.put("SharedAccessKeyValue", 
stringProperties.getSharedAccessKey());
+
+        String host = stringProperties.getEndpoint().getHost();
+        properties.put("Namespace", host.substring(0, host.indexOf('.')));
+
+        return properties;
+    }
+}
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsProducers.java
 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsProducers.java
new file mode 100644
index 0000000000..82830c6dbd
--- /dev/null
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsProducers.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.azure.eventhubs.it;
+
+import java.util.Optional;
+
+import com.azure.core.amqp.implementation.ConnectionStringProperties;
+import com.azure.core.credential.TokenCredential;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import com.azure.messaging.eventhubs.implementation.ClientConstants;
+import 
com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
+import jakarta.inject.Named;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+public class AzureEventhubsProducers {
+    @ConfigProperty(name = "azure.event.hubs.connection.string")
+    Optional<String> connectionString;
+
+    @Named("connectionStringTokenCredential")
+    TokenCredential tokenCredential() {
+        if (connectionString.isPresent()) {
+            ConnectionStringProperties properties = new 
ConnectionStringProperties(connectionString.get());
+            TokenCredential tokenCredential;
+            if (properties.getSharedAccessSignature() == null) {
+                tokenCredential = new 
EventHubSharedKeyCredential(properties.getSharedAccessKeyName(),
+                        properties.getSharedAccessKey(), 
ClientConstants.TOKEN_VALIDITY);
+            } else {
+                tokenCredential = new 
EventHubSharedKeyCredential(properties.getSharedAccessSignature());
+            }
+            return tokenCredential;
+        }
+        return null;
+    }
+
+    @Named("eventHubClient")
+    EventHubProducerAsyncClient eventHubClient() {
+        return connectionString.map(connection -> new EventHubClientBuilder()
+                .connectionString(connection)
+                .buildAsyncProducerClient())
+                .orElse(null);
+    }
+}
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
index 740d24f9cf..c69dd53710 100644
--- 
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
@@ -17,77 +17,133 @@
 package org.apache.camel.quarkus.component.azure.eventhubs.it;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
-import io.quarkus.scheduler.Scheduled;
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
 import jakarta.ws.rs.Consumes;
 import jakarta.ws.rs.GET;
 import jakarta.ws.rs.POST;
 import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
 import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.core.Response;
 import org.apache.camel.CamelContext;
-import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.azure.eventhubs.EventHubsConstants;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.logging.Logger;
 
 @Path("/azure-eventhubs")
 @ApplicationScoped
 public class AzureEventhubsResource {
+    private static final Logger LOG = 
Logger.getLogger(AzureEventhubsResource.class);
 
     @Inject
     ProducerTemplate producerTemplate;
 
-    @Inject
-    ConsumerTemplate consumerTemplate;
-
     @Inject
     CamelContext context;
 
-    @ConfigProperty(name = "azure.event.hubs.connection.string")
-    Optional<String> connectionString;
-
-    private volatile String message;
-    private int counter = 0;
-
-    /**
-     * For some reason if we send just a single message, it is not always 
received by the consumer.
-     * Sending multiple messages seems to be more reliable.
-     */
-    @Scheduled(every = "1s")
-    void schedule() {
-        if (message != null) {
-            final String endpointUri = 
"azure-eventhubs:?connectionString=RAW(" + connectionString.get() + ")";
-            producerTemplate.sendBody(endpointUri, message + (counter++));
+    @Path("/receive-event")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public Map<String, Object> receiveEvent(@QueryParam("endpointUri") String 
endpointUri, String match) {
+        final MockEndpoint mockEndpoint = context.getEndpoint(endpointUri, 
MockEndpoint.class);
+        List<Exchange> receivedExchanges = mockEndpoint.getReceivedExchanges();
+
+        Optional<Exchange> optionalExchange = receivedExchanges.stream()
+                .filter(exchange -> 
exchange.getMessage().getBody(String.class).equals(match))
+                .findFirst();
+
+        if (optionalExchange.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        Exchange exchange = optionalExchange.get();
+        Message message = exchange.getMessage();
+        return Map.of(
+                "body", message.getBody(String.class),
+                "headers", message.getHeaders());
+    }
+
+    @Path("/send-event/{partitionId}")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response sendEvent(
+            @PathParam("partitionId") String partitionId,
+            @QueryParam("endpointUri") String endpointUri,
+            String message) throws Exception {
+
+        if (ObjectHelper.isEmpty(endpointUri)) {
+            endpointUri = "direct:sendEvent";
         }
+
+        LOG.infof("Producing event to endpoint uri: %s", endpointUri);
+
+        producerTemplate.sendBodyAndHeader(endpointUri, message, 
EventHubsConstants.PARTITION_ID, partitionId);
+        return Response.created(new URI("https://camel.apache.org/";)).build();
     }
 
     @Path("/receive-events")
     @GET
     @Produces(MediaType.APPLICATION_JSON)
-    public List<String> receiveEvents() throws Exception {
+    public List<Map<String, Object>> receiveEvents(@QueryParam("endpointUri") 
String endpointUri, List<String> matches) {
+        final MockEndpoint mockEndpoint = context.getEndpoint(endpointUri, 
MockEndpoint.class);
+        List<Exchange> receivedExchanges = mockEndpoint.getReceivedExchanges();
+
+        List<Exchange> exchanges = receivedExchanges.stream()
+                .filter(exchange -> 
matches.contains(exchange.getMessage().getBody(String.class)))
+                .toList();
+
+        if (exchanges.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<Map<String, Object>> result = new ArrayList<>();
+        for (Exchange exchange : exchanges) {
+            Message message = exchange.getMessage();
+            result.add(Map.of(
+                    "body", message.getBody(String.class),
+                    "headers", message.getHeaders()));
+        }
 
-        final MockEndpoint mockEndpoint = 
context.getEndpoint("mock:azure-consumed", MockEndpoint.class);
-        return mockEndpoint.getReceivedExchanges().stream()
-                .map(Exchange::getMessage)
-                .map(m -> m.getBody(String.class))
-                .collect(Collectors.toList());
+        return result;
     }
 
-    @Path("/send-events")
+    @Path("/send-events/{partitionId}")
     @POST
+    @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.TEXT_PLAIN)
-    @Consumes(MediaType.TEXT_PLAIN)
-    public Response sendEvents(String body) throws Exception {
-        this.message = body; // start sending the messages via schedule()
+    public Response sendEvents(@PathParam("partitionId") String partitionId, 
List<String> messages) throws Exception {
+        producerTemplate.sendBodyAndHeader("direct:sendEvent", messages, 
EventHubsConstants.PARTITION_ID, partitionId);
         return Response.created(new URI("https://camel.apache.org/";)).build();
     }
 
+    @Path("/route/{routeId}/start")
+    @POST
+    public void startRoute(@PathParam("routeId") String routeId) throws 
Exception {
+        LOG.infof("Starting route: %s", routeId);
+        context.getRouteController().startRoute(routeId);
+        // A random jitter value is applied in the Event Hubs client before 
its message listener is active.
+        // In addition, claiming ownership of partitions seems to take an 
indeterminate amount of time.
+        // Therefore, we need to wait until it's safe to produce events
+        Thread.sleep(5000);
+    }
+
+    @Path("/route/{routeId}/stop")
+    @POST
+    public void stopRoute(@PathParam("routeId") String routeId) throws 
Exception {
+        context.getRouteController().stopRoute(routeId);
+    }
 }
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
index 3a787c8478..d9b611a61a 100644
--- 
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
@@ -16,15 +16,21 @@
  */
 package org.apache.camel.quarkus.component.azure.eventhubs.it;
 
+import java.util.Map;
 import java.util.Optional;
 
 import com.azure.core.amqp.AmqpTransportType;
+import com.azure.core.credential.TokenCredential;
+import com.azure.messaging.eventhubs.models.EventPosition;
 import jakarta.enterprise.context.ApplicationScoped;
-import org.apache.camel.builder.RouteBuilder;
+import jakarta.inject.Inject;
+import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
+import org.apache.camel.component.azure.eventhubs.CredentialType;
+import org.apache.camel.quarkus.test.mock.backend.MockBackendUtils;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 
 @ApplicationScoped
-public class AzureEventhubsRoutes extends RouteBuilder {
+public class AzureEventhubsRoutes extends EndpointRouteBuilder {
 
     @ConfigProperty(name = "azure.storage.account-name")
     String azureStorageAccountName;
@@ -38,22 +44,178 @@ public class AzureEventhubsRoutes extends RouteBuilder {
     @ConfigProperty(name = "azure.event.hubs.blob.container.name")
     Optional<String> azureBlobContainerName;
 
-    @ConfigProperty(name = "camel.quarkus.start.mock.backend", defaultValue = 
"true")
-    boolean startMockBackend;
+    @Inject
+    TokenCredential tokenCredential;
 
     @Override
-    public void configure() throws Exception {
-        if (connectionString.isPresent() && 
azureBlobContainerName.isPresent()) {
-            from("azure-eventhubs:?connectionString=RAW(" + 
connectionString.get()
-                    + ")&blobAccountName=RAW(" + azureStorageAccountName
-                    + ")&blobAccessKey=RAW(" + azureStorageAccountKey
-                    + ")&blobContainerName=RAW(" + 
azureBlobContainerName.get() + ")&amqpTransportType="
-                    + AmqpTransportType.AMQP)
-                    .to("mock:azure-consumed");
-        } else if (!startMockBackend) {
+    public void configure() {
+        if (!MockBackendUtils.startMockBackend() && 
!AzureCredentialsHelper.isMinimumConfigurationAvailable()) {
             throw new IllegalStateException(
-                    "azure.event.hubs.connection.string and 
azure.event.hubs.blob.container.name must be set when 
camel.quarkus.start.mock.backend == false");
+                    "Configuration properties 
azure.event.hubs.connection.string, azure.event.hubs.blob.container.name & 
azure.storage.account-key must be set when camel.quarkus.start.mock.backend == 
false");
         }
-    }
 
+        if (AzureCredentialsHelper.isMinimumConfigurationAvailable()) {
+            Map<String, String> connectionProperties = 
AzureCredentialsHelper.parseConnectionString(connectionString.get());
+            String eventHubsPath = 
"%s/%s".formatted(connectionProperties.get("Namespace"),
+                    connectionProperties.get("EntityPath"));
+
+            // Consumes EventHub messages and routes them based on which 
partition they are associated with
+            from(azureEventhubs("")
+                    .connectionString(connectionString.get())
+                    .blobAccountName(azureStorageAccountName)
+                    .blobAccessKey(azureStorageAccountKey)
+                    .blobContainerName(azureBlobContainerName.get()))
+                    .routeId("eventhubs-consumer")
+                    .autoStartup(false)
+                    .log("Consumed event payload ${body} from partition 
${header.CamelAzureEventHubsPartitionId}")
+                    .choice()
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
0"))
+                    .to("mock:partition-0-results")
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
1"))
+                    .to("mock:partition-1-results")
+                    .otherwise()
+                    .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+            // Consumes events from partition 2 with InMemoryCheckpointStore
+            from(azureEventhubs("")
+                    .connectionString(connectionString.get())
+                    .checkpointStore(new InMemoryCheckpointStore()))
+                    .routeId("eventhubs-consumer-custom-checkpoint-store")
+                    .autoStartup(false)
+                    .log("Consumed event payload ${body} from partition 
${header.CamelAzureEventHubsPartitionId} with InMemoryCheckpointStore")
+                    .choice()
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
2"))
+                    .to("mock:partition-2-initial-results")
+                    .otherwise()
+                    .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+            // Reads all events sent to partition 2 from the beginning
+            from(azureEventhubs("")
+                    .connectionString(connectionString.get())
+                    .checkpointStore(new InMemoryCheckpointStore())
+                    .eventPosition(Map.of("2", EventPosition.earliest())))
+                    .routeId("eventhubs-consumer-with-event-position")
+                    .autoStartup(false)
+                    .log("Consumed event payload ${body} from partition 
${header.CamelAzureEventHubsPartitionId} from EventPosition.earliest")
+                    .choice()
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
2"))
+                    .to("mock:partition-2-event-position-results")
+                    .otherwise()
+                    .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+            // Consumes events from partition 3 using a custom TokenCredential
+            from(azureEventhubs(eventHubsPath)
+                    .credentialType(CredentialType.TOKEN_CREDENTIAL)
+                    .tokenCredential(tokenCredential)
+                    .blobAccountName(azureStorageAccountName)
+                    .blobAccessKey(azureStorageAccountKey)
+                    .blobContainerName(azureBlobContainerName.get()))
+                    .routeId("eventhubs-consumer-custom-token-credential")
+                    .autoStartup(false)
+                    .log("Consumed event payload ${body} from partition 
${header.CamelAzureEventHubsPartitionId} with TokenCredential")
+                    .choice()
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
3"))
+                    .to("mock:partition-3-results")
+                    .otherwise()
+                    .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+            // Consumes events from partition 4 using WS transport
+            from(azureEventhubs("")
+                    .connectionString(connectionString.get())
+                    .blobAccountName(azureStorageAccountName)
+                    .blobAccessKey(azureStorageAccountKey)
+                    .blobContainerName(azureBlobContainerName.get())
+                    .amqpTransportType(AmqpTransportType.AMQP_WEB_SOCKETS))
+                    .routeId("eventhubs-consumer-with-amqp-ws-transport")
+                    .autoStartup(false)
+                    .choice()
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
4"))
+                    .to("mock:partition-4-ws-transport-results")
+                    .otherwise()
+                    .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+            from("direct:sendEvent")
+                    .to(azureEventhubs("")
+                            .connectionString(connectionString.get()));
+
+            from("direct:sendEventUsingAmqpWebSockets")
+                    .to(azureEventhubs("")
+                            .connectionString(connectionString.get())
+                            
.amqpTransportType(AmqpTransportType.AMQP_WEB_SOCKETS));
+
+            from("direct:sendEventUsingTokenCredential")
+                    .to(azureEventhubs(eventHubsPath)
+                            .credentialType(CredentialType.TOKEN_CREDENTIAL)
+                            .tokenCredential(tokenCredential));
+
+            // Consumes EventHub messages that are produced by the custom 
client in direct:sendEventUsingCustomClient
+            from(azureEventhubs("")
+                    .connectionString(connectionString.get())
+                    .blobAccountName(azureStorageAccountName)
+                    .blobAccessKey(azureStorageAccountKey)
+                    .blobContainerName(azureBlobContainerName.get()))
+                    .routeId("eventhubs-consumer-for-custom-client")
+                    .autoStartup(false)
+                    .log("Consumed event payload ${body} from partition 
${header.CamelAzureEventHubsPartitionId}")
+                    .choice()
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
0"))
+                    .to("mock:partition-0-custom-client-results")
+                    .otherwise()
+                    .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+            from("direct:sendEventUsingCustomClient")
+                    .to(azureEventhubs("")
+                            .producerAsyncClient("#eventHubClient"));
+
+            // Consumes using an auto-generated connection string from the 
shared access configuration
+            from(azureEventhubs(eventHubsPath)
+                    
.sharedAccessName(connectionProperties.get("SharedAccessKey"))
+                    
.sharedAccessKey(connectionProperties.get("SharedAccessKeyValue"))
+                    .blobAccountName(azureStorageAccountName)
+                    .blobAccessKey(azureStorageAccountKey)
+                    .blobContainerName(azureBlobContainerName.get()))
+                    .routeId("eventhubs-consumer-generated-connection-string")
+                    .autoStartup(false)
+                    .log("Consumed event payload ${body} from partition 
${header.CamelAzureEventHubsPartitionId}")
+                    .choice()
+                    .when(simple("${header.CamelAzureEventHubsPartitionId} == 
0"))
+                    .to("mock:partition-0-generated-connection-string-results")
+                    .otherwise()
+                    .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+            from("direct:sendEventWithGeneratedConnectionString")
+                    .to(azureEventhubs(eventHubsPath)
+                            
.sharedAccessName(connectionProperties.get("SharedAccessKey"))
+                            
.sharedAccessKey(connectionProperties.get("SharedAccessKeyValue")));
+
+            if (AzureCredentialsHelper.isAzureIdentityCredentialsAvailable()) {
+                // Consumes events from partition 4 using AZURE_IDENTITY 
credential type
+                from(azureEventhubs(eventHubsPath)
+                        .credentialType(CredentialType.AZURE_IDENTITY)
+                        // TODO: Remove shared access config
+                        // https://github.com/apache/camel-quarkus/issues/6368
+                        .sharedAccessName("fake-name")
+                        .sharedAccessKey("fake-key")
+                        .blobAccountName(azureStorageAccountName)
+                        .blobAccessKey(azureStorageAccountKey)
+                        .blobContainerName(azureBlobContainerName.get()))
+                        
.routeId("eventhubs-consumer-azure-identity-credential")
+                        .autoStartup(false)
+                        .log("Consumed event payload ${body} from partition 
${header.CamelAzureEventHubsPartitionId} with TokenCredential")
+                        .choice()
+                        .when(simple("${header.CamelAzureEventHubsPartitionId} 
== 4"))
+                        .to("mock:partition-4-results")
+                        .otherwise()
+                        .log("Message received from unexpected partition id 
${header.CamelAzureEventHubsPartitionId}");
+
+                from("direct:sendEventUsingAzureIdentity")
+                        .to(azureEventhubs(eventHubsPath)
+                                // TODO: Remove shared access config
+                                // 
https://github.com/apache/camel-quarkus/issues/6368
+                                .sharedAccessName("fake-name")
+                                .sharedAccessKey("fake-key")
+                                
.credentialType(CredentialType.AZURE_IDENTITY));
+            }
+        }
+    }
 }
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/InMemoryCheckpointStore.java
 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/InMemoryCheckpointStore.java
new file mode 100644
index 0000000000..3033810539
--- /dev/null
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/InMemoryCheckpointStore.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package org.apache.camel.quarkus.component.azure.eventhubs.it;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.azure.core.util.CoreUtils;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.eventhubs.CheckpointStore;
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import static 
com.azure.messaging.eventhubs.implementation.ClientConstants.OWNER_ID_KEY;
+import static 
com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
+import static 
com.azure.messaging.eventhubs.implementation.ClientConstants.SEQUENCE_NUMBER_KEY;
+
+/**
+ * An in-memory checkpoint store. This is primarily to test custom event 
positioning. Inspired by
+ * 
https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/SampleCheckpointStore.java
+ */
+public class InMemoryCheckpointStore implements CheckpointStore {
+    private static final String OWNERSHIP = "ownership";
+    private static final String SEPARATOR = "/";
+    private static final String CHECKPOINT = "checkpoint";
+    private final Map<String, PartitionOwnership> partitionOwnershipMap = new 
ConcurrentHashMap<>();
+    private final Map<String, Checkpoint> checkpointsMap = new 
ConcurrentHashMap<>();
+    private static final ClientLogger LOGGER = new 
ClientLogger(InMemoryCheckpointStore.class);
+
+    @Override
+    public Flux<PartitionOwnership> listOwnership(String 
fullyQualifiedNamespace, String eventHubName,
+            String consumerGroup) {
+        LOGGER.info("Listing partition ownership");
+
+        String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName, 
consumerGroup, OWNERSHIP);
+        return Flux.fromIterable(partitionOwnershipMap.keySet())
+                .filter(key -> key.startsWith(prefix))
+                .map(key -> partitionOwnershipMap.get(key));
+    }
+
+    private String prefixBuilder(String fullyQualifiedNamespace, String 
eventHubName, String consumerGroup,
+            String type) {
+        return new StringBuilder()
+                .append(fullyQualifiedNamespace)
+                .append(SEPARATOR)
+                .append(eventHubName)
+                .append(SEPARATOR)
+                .append(consumerGroup)
+                .append(SEPARATOR)
+                .append(type)
+                .toString()
+                .toLowerCase(Locale.ROOT);
+    }
+
+    @Override
+    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> 
requestedPartitionOwnerships) {
+        if (CoreUtils.isNullOrEmpty(requestedPartitionOwnerships)) {
+            return Flux.empty();
+        }
+        PartitionOwnership firstEntry = requestedPartitionOwnerships.get(0);
+        String prefix = prefixBuilder(firstEntry.getFullyQualifiedNamespace(), 
firstEntry.getEventHubName(),
+                firstEntry.getConsumerGroup(), OWNERSHIP);
+
+        return Flux.fromIterable(requestedPartitionOwnerships)
+                .filter(ownershipRequest -> {
+                    final String key = prefix + SEPARATOR + 
ownershipRequest.getPartitionId();
+                    final PartitionOwnership existing = 
partitionOwnershipMap.get(key);
+
+                    if (existing == null) {
+                        return true;
+                    }
+
+                    return 
existing.getETag().equals(ownershipRequest.getETag());
+                })
+                .doOnNext(partitionOwnership -> LOGGER.atInfo()
+                        .addKeyValue(PARTITION_ID_KEY, 
partitionOwnership.getPartitionId())
+                        .addKeyValue(OWNER_ID_KEY, 
partitionOwnership.getOwnerId())
+                        .log("Ownership claimed"))
+                .map(partitionOwnership -> {
+                    partitionOwnership.setETag(UUID.randomUUID().toString())
+                            .setLastModifiedTime(System.currentTimeMillis());
+
+                    final String key = prefix + SEPARATOR + 
partitionOwnership.getPartitionId();
+                    partitionOwnershipMap.put(key, partitionOwnership);
+
+                    return partitionOwnership;
+                });
+    }
+
+    @Override
+    public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, 
String eventHubName, String consumerGroup) {
+        String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName, 
consumerGroup, CHECKPOINT);
+        return Flux.fromIterable(checkpointsMap.keySet())
+                .filter(key -> key.startsWith(prefix))
+                .map(checkpointsMap::get);
+    }
+
+    @Override
+    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
+        if (checkpoint == null) {
+            return Mono.error(LOGGER.logExceptionAsError(new 
NullPointerException("checkpoint cannot be null")));
+        }
+
+        String prefix = prefixBuilder(checkpoint.getFullyQualifiedNamespace(), 
checkpoint.getEventHubName(),
+                checkpoint.getConsumerGroup(), CHECKPOINT);
+        checkpointsMap.put(prefix + SEPARATOR + checkpoint.getPartitionId(), 
checkpoint);
+        LOGGER.atInfo()
+                .addKeyValue(PARTITION_ID_KEY, checkpoint.getPartitionId())
+                .addKeyValue(SEQUENCE_NUMBER_KEY, 
checkpoint.getSequenceNumber())
+                .log("Updated checkpoint.");
+        return Mono.empty();
+    }
+}
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/main/resources/application.properties
 
b/integration-test-groups/azure/azure-eventhubs/src/main/resources/application.properties
new file mode 100644
index 0000000000..52792c953e
--- /dev/null
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/main/resources/application.properties
@@ -0,0 +1,22 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# Disable autowiring of EventHubProducerAsyncClient as we want control of 
which endpoints the custom client is used in
+camel.component.azure-eventhubs.autowired-enabled = false
+
+# Uncomment to reduce log noise from com.azure internals
+# quarkus.log.category."com.azure".level = OFF
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
 
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
index 8489a5b69e..ce91d90c03 100644
--- 
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
@@ -21,7 +21,7 @@ import 
org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
 
 @EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_NAME", matches = 
".+")
 @EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_KEY", matches = 
".+")
-@EnabledIfEnvironmentVariable(named = "AZURE_BLOB_CONTAINER_NAME", matches = 
".+")
+@EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_BLOB_CONTAINER_NAME", 
matches = ".+")
 @EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_CONNECTION_STRING", 
matches = ".+")
 @QuarkusIntegrationTest
 class AzureEventhubsIT extends AzureEventhubsTest {
diff --git 
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
 
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
index 5b7c8627ca..88449d6187 100644
--- 
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
+++ 
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
@@ -16,45 +16,414 @@
  */
 package org.apache.camel.quarkus.component.azure.eventhubs.it;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
-import org.apache.commons.lang3.RandomStringUtils;
+import io.restassured.response.Response;
 import org.awaitility.Awaitility;
-import org.jboss.logging.Logger;
+import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 @EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_NAME", matches = 
".+")
 @EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_KEY", matches = 
".+")
 @EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_BLOB_CONTAINER_NAME", 
matches = ".+")
 @EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_CONNECTION_STRING", 
matches = ".+")
 @QuarkusTest
 class AzureEventhubsTest {
+    // NOTE: Consumer endpoints are started / stopped manually to prevent them 
from inferring with each other
+
+    @Test
+    void produceConsumeEvents() {
+        try {
+            RestAssured.given()
+                    .post("/azure-eventhubs/route/eventhubs-consumer/start")
+                    .then()
+                    .statusCode(204);
+
+            final String messageBody = UUID.randomUUID().toString();
+
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .body(messageBody)
+                    .post("/azure-eventhubs/send-event/0")
+                    .then()
+                    .statusCode(201);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .queryParam("endpointUri", "mock:partition-0-results")
+                        .body(messageBody)
+                        .get("/azure-eventhubs/receive-event")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "body", is(messageBody),
+                                "headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "headers.CamelAzureEventHubsPartitionId", 
is("0"),
+                                "headers.CamelAzureEventHubsSequenceNumber", 
greaterThanOrEqualTo(0));
+            });
+        } finally {
+            RestAssured.given()
+                    .post("/azure-eventhubs/route/eventhubs-consumer/stop")
+                    .then()
+                    .statusCode(204);
+        }
+    }
+
+    @Test
+    void produceMultipleMessages() {
+        try {
+            RestAssured.given()
+                    .post("/azure-eventhubs/route/eventhubs-consumer/start")
+                    .then()
+                    .statusCode(204);
+
+            List<String> messages = new ArrayList<>(3);
+            for (int i = 0; i < 3; i++) {
+                messages.add(UUID.randomUUID().toString());
+            }
+
+            RestAssured.given()
+                    .contentType(ContentType.JSON)
+                    .body(messages)
+                    .post("/azure-eventhubs/send-events/1")
+                    .then()
+                    .statusCode(201);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .contentType(ContentType.JSON)
+                        .queryParam("endpointUri", "mock:partition-1-results")
+                        .body(messages)
+                        .get("/azure-eventhubs/receive-events")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "size()", is(3),
+                                "[0].body", is(messages.get(0)),
+                                "[0].headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "[0].headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "[0].headers.CamelAzureEventHubsPartitionId", 
is("1"),
+                                
"[0].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+                                "[1].body", is(messages.get(1)),
+                                "[1].headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "[1].headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "[1].headers.CamelAzureEventHubsPartitionId", 
is("1"),
+                                
"[1].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+                                "[2].body", is(messages.get(2)),
+                                "[2].headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "[2].headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "[2].headers.CamelAzureEventHubsPartitionId", 
is("1"),
+                                
"[2].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0));
+            });
+        } finally {
+            RestAssured.given()
+                    .post("/azure-eventhubs/route/eventhubs-consumer/stop")
+                    .then()
+                    .statusCode(204);
+        }
+    }
+
+    @Test
+    void produceConsumeEventsWithCustomClient() {
+        try {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-for-custom-client/start")
+                    .then()
+                    .statusCode(204);
+
+            final String messageBody = UUID.randomUUID().toString();
+
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .queryParam("endpointUri", 
"direct:sendEventUsingCustomClient")
+                    .body(messageBody)
+                    .post("/azure-eventhubs/send-event/0")
+                    .then()
+                    .statusCode(201);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .queryParam("endpointUri", 
"mock:partition-0-custom-client-results")
+                        .body(messageBody)
+                        .get("/azure-eventhubs/receive-event")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "body", is(messageBody),
+                                "headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "headers.CamelAzureEventHubsPartitionId", 
is("0"),
+                                "headers.CamelAzureEventHubsSequenceNumber", 
greaterThanOrEqualTo(0));
+            });
+        } finally {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-for-custom-client/stop")
+                    .then()
+                    .statusCode(204);
+        }
+    }
+
+    @Test
+    void customEventPosition() {
+        try {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-custom-checkpoint-store/start")
+                    .then()
+                    .statusCode(204);
+
+            // Send some messages to partition 2
+            List<String> messages = new ArrayList<>(3);
+            for (int i = 0; i < 3; i++) {
+                messages.add(UUID.randomUUID().toString());
+            }
+
+            RestAssured.given()
+                    .contentType(ContentType.JSON)
+                    .body(messages)
+                    .post("/azure-eventhubs/send-events/2")
+                    .then()
+                    .statusCode(201);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .queryParam("endpointUri", 
"mock:partition-2-initial-results")
+                        .contentType(ContentType.JSON)
+                        .body(messages)
+                        .get("/azure-eventhubs/receive-events")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "size()", is(3),
+                                "[0].body", is(messages.get(0)),
+                                "[0].headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "[0].headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "[0].headers.CamelAzureEventHubsPartitionId", 
is("2"),
+                                
"[0].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+                                "[1].body", is(messages.get(1)),
+                                "[1].headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "[1].headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "[1].headers.CamelAzureEventHubsPartitionId", 
is("2"),
+                                
"[1].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+                                "[2].body", is(messages.get(2)),
+                                "[2].headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "[2].headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "[2].headers.CamelAzureEventHubsPartitionId", 
is("2"),
+                                
"[2].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0));
+            });
+
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-custom-checkpoint-store/stop")
+                    .then()
+                    .statusCode(204);
+
+            // Start another consumer configured to read partition 2 from the 
earliest offset
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-with-event-position/start")
+                    .then()
+                    .statusCode(204);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                Response response = RestAssured.given()
+                        .queryParam("endpointUri", 
"mock:partition-2-event-position-results")
+                        .contentType(ContentType.JSON)
+                        .body(messages)
+                        .get("/azure-eventhubs/receive-events")
+                        .then()
+                        .statusCode(200)
+                        .extract()
+                        .response();
+
+                // Based on the data retention period configured on the 
EventHub, we can't make assumptions about what data is in the partition
+                // Therefore, we assume the last 3 events will be the ones 
produced earlier in the test
+                List<Map<String, Object>> results = 
response.jsonPath().getList("$.");
+                int size = results.size();
+                assertTrue(size >= 3);
+                assertEquals(messages.get(0), results.get(size - 
3).get("body"));
+                assertEquals(messages.get(1), results.get(size - 
2).get("body"));
+                assertEquals(messages.get(2), results.get(size - 
1).get("body"));
+            });
+        } finally {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-with-event-position/stop")
+                    .then()
+                    .statusCode(204);
+        }
+    }
+
+    @Test
+    void tokenCredentials() {
+        try {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-custom-token-credential/start")
+                    .then()
+                    .statusCode(204);
+
+            final String messageBody = UUID.randomUUID().toString();
+
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .queryParam("endpointUri", 
"direct:sendEventUsingTokenCredential")
+                    .body(messageBody)
+                    .post("/azure-eventhubs/send-event/3")
+                    .then()
+                    .statusCode(201);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .queryParam("endpointUri", "mock:partition-3-results")
+                        .body(messageBody)
+                        .get("/azure-eventhubs/receive-event")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "body", is(messageBody),
+                                "headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "headers.CamelAzureEventHubsPartitionId", 
is("3"),
+                                "headers.CamelAzureEventHubsSequenceNumber", 
greaterThanOrEqualTo(0));
+            });
+        } finally {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-custom-token-credential/stop")
+                    .then()
+                    .statusCode(204);
+        }
+    }
+
+    @Test
+    void azureIdentityCredentials() {
+        
Assumptions.assumeTrue(AzureCredentialsHelper.isAzureIdentityCredentialsAvailable());
 
-    private static final Logger LOG = 
Logger.getLogger(AzureEventhubsTest.class);
+        try {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-azure-identity-credential/start")
+                    .then()
+                    .statusCode(204);
+
+            final String messageBody = UUID.randomUUID().toString();
+
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .queryParam("endpointUri", 
"direct:sendEventUsingAzureIdentity")
+                    .body(messageBody)
+                    .post("/azure-eventhubs/send-event/4")
+                    .then()
+                    .statusCode(201);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .queryParam("endpointUri", "mock:partition-4-results")
+                        .body(messageBody)
+                        .get("/azure-eventhubs/receive-event")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "body", is(messageBody),
+                                "headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "headers.CamelAzureEventHubsPartitionId", 
is("4"),
+                                "headers.CamelAzureEventHubsSequenceNumber", 
greaterThanOrEqualTo(0));
+            });
+        } finally {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-azure-identity-credential/stop")
+                    .then()
+                    .statusCode(204);
+        }
+    }
+
+    @Test
+    void amqpWebSocketsTransport() {
+        try {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-with-amqp-ws-transport/start")
+                    .then()
+                    .statusCode(204);
+
+            final String messageBody = UUID.randomUUID().toString();
+
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .queryParam("endpointUri", 
"direct:sendEventUsingAmqpWebSockets")
+                    .body(messageBody)
+                    .post("/azure-eventhubs/send-event/4")
+                    .then()
+                    .statusCode(201);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .queryParam("endpointUri", 
"mock:partition-4-ws-transport-results")
+                        .body(messageBody)
+                        .get("/azure-eventhubs/receive-event")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "body", is(messageBody),
+                                "headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "headers.CamelAzureEventHubsPartitionId", 
is("4"),
+                                "headers.CamelAzureEventHubsSequenceNumber", 
greaterThanOrEqualTo(0));
+            });
+        } finally {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-with-amqp-ws-transport/stop")
+                    .then()
+                    .statusCode(204);
+        }
+    }
 
     @Test
-    public void roundTrip() {
-        final String messageBody = RandomStringUtils.randomAlphabetic(30);
+    void generatedConnectionString() {
+        try {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-generated-connection-string/start")
+                    .then()
+                    .statusCode(204);
 
-        RestAssured.given()
-                .contentType(ContentType.TEXT)
-                .body(messageBody)
-                .post("/azure-eventhubs/send-events")
-                .then()
-                .statusCode(201);
+            final String messageBody = UUID.randomUUID().toString();
 
-        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, 
TimeUnit.SECONDS).until(() -> {
-            final String body = RestAssured.given()
-                    .get("/azure-eventhubs/receive-events")
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .queryParam("endpointUri", 
"direct:sendEventWithGeneratedConnectionString")
+                    .body(messageBody)
+                    .post("/azure-eventhubs/send-event/0")
                     .then()
-                    .extract().body().asString();
-            LOG.infof("Expected message body: '%s'; actual: '%s'", 
messageBody, body);
-            return body != null && body.contains(messageBody);
-        });
+                    .statusCode(201);
 
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1, 
TimeUnit.MINUTES).untilAsserted(() -> {
+                RestAssured.given()
+                        .queryParam("endpointUri", 
"mock:partition-0-generated-connection-string-results")
+                        .body(messageBody)
+                        .get("/azure-eventhubs/receive-event")
+                        .then()
+                        .statusCode(200)
+                        .body(
+                                "body", is(messageBody),
+                                "headers.CamelAzureEventHubsEnqueuedTime", 
notNullValue(),
+                                "headers.CamelAzureEventHubsOffset", 
greaterThanOrEqualTo(0),
+                                "headers.CamelAzureEventHubsPartitionId", 
is("0"),
+                                "headers.CamelAzureEventHubsSequenceNumber", 
greaterThanOrEqualTo(0));
+            });
+        } finally {
+            RestAssured.given()
+                    
.post("/azure-eventhubs/route/eventhubs-consumer-generated-connection-string/stop")
+                    .then()
+                    .statusCode(204);
+        }
     }
 }
diff --git a/integration-test-groups/azure/azure-resources.sh 
b/integration-test-groups/azure/azure-resources.sh
index 243d541662..f3a184be2e 100755
--- a/integration-test-groups/azure/azure-resources.sh
+++ b/integration-test-groups/azure/azure-resources.sh
@@ -74,7 +74,7 @@ function createResources() {
     az storage container create --account-name ${AZURE_STORAGE_ACCOUNT_NAME} 
--name ${AZURE_BLOB_CONTAINER_NAME} --auth-mode login
 
     az eventhubs namespace create --name ${EH_NAMESPACE} --resource-group 
${RESOURCE_GROUP} --location ${ZONE}
-    az eventhubs eventhub create --name ${EH_NAME} --resource-group 
${RESOURCE_GROUP} --namespace-name ${EH_NAMESPACE} --partition-count 1
+    az eventhubs eventhub create --name ${EH_NAME} --resource-group 
${RESOURCE_GROUP} --namespace-name ${EH_NAMESPACE} --partition-count 5
 
     AZURE_EVENT_HUBS_CONNECTION_STRING=$(az eventhubs namespace 
authorization-rule keys list --resource-group ${RESOURCE_GROUP} 
--namespace-name ${EH_NAMESPACE} --name RootManageSharedAccessKey  --query 
primaryConnectionString -o tsv)
 

Reply via email to