This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b7510cbded3a82378b7f689e3c5506fc8bfabc25 Author: Andrea Tarocchi <[email protected]> AuthorDate: Fri Oct 2 22:17:27 2020 +0200 Added RabbitMQ itests. --- tests/itests-rabbitmq/pom.xml | 72 ++++++++ .../rabbitmq/clients/RabbitMQClient.java | 184 +++++++++++++++++++++ .../rabbitmq/services/ConnectionProperties.java | 25 +++ .../services/RabbitMQLocalContainerService.java | 73 ++++++++ .../rabbitmq/services/RabbitMQRemoteService.java | 44 +++++ .../rabbitmq/services/RabbitMQService.java | 60 +++++++ .../rabbitmq/services/RabbitMQServiceFactory.java | 45 +++++ .../sink/CamelRabbitMQPropertyFactory.java | 76 +++++++++ .../rabbitmq/sink/RabbitMQSinkITCase.java | 145 ++++++++++++++++ .../source/CamelRabbitMQPropertyFactory.java | 76 +++++++++ .../rabbitmq/source/RabbitMQSourceITCase.java | 109 ++++++++++++ tests/pom.xml | 1 + 12 files changed, 910 insertions(+) diff --git a/tests/itests-rabbitmq/pom.xml b/tests/itests-rabbitmq/pom.xml new file mode 100644 index 0000000..4365c68 --- /dev/null +++ b/tests/itests-rabbitmq/pom.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.6.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-rabbitmq</artifactId> + <name>Camel-Kafka-Connector :: Tests :: RabbitMQ</name> + + <properties> +<!-- <jmx.port>9010</jmx.port>--> +<!-- <rmi.server>localhost</rmi.server>--> +<!-- <jvm.user.settings />--> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-rabbitmq</artifactId> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>rabbitmq</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> +<!-- <plugin>--> +<!-- <groupId>org.apache.maven.plugins</groupId>--> +<!-- <artifactId>maven-failsafe-plugin</artifactId>--> +<!-- <configuration>--> +<!-- <argLine>${common.failsafe.args} ${jvm.user.settings} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=${rmi.server} -Dcom.sun.management.jmxremote.rmi.port=${jmx.port}</argLine>--> +<!-- <skipTests>${skipIntegrationTests}</skipTests>--> +<!-- </configuration>--> +<!-- </plugin>--> + </plugins> + </build> + + +</project> \ No newline at end of file diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java new file mode 100644 index 0000000..19da657 --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.rabbitmq.clients; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.MessageProperties; +import org.junit.jupiter.api.Assertions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A basic RabbitMQ client + */ +public class RabbitMQClient { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQClient.class); + private static final String DEFAULT_EXCHANGE_TYPE = "direct"; + + private Connection connection; + private Channel channel; + + private ConnectionFactory factory; + + public RabbitMQClient(String uri) { + factory = new ConnectionFactory(); + try { + factory.setUri(uri); + } catch (Exception e) { + LOG.error("Unable to create the RabbitMQ client {}", e.getMessage(), e); + Assertions.fail(e); + } + } + + private static void capturingClose(Closeable closeable, String closableDescription) { + LOG.debug("Closing the " + closableDescription); + + if (closeable != null) { + try { + closeable.close(); + } catch (Throwable t) { + LOG.warn("Error closing the {}: {}", closableDescription, t.getMessage(), t); + } + } + } + + private static void capturingClose(AutoCloseable closeable, String closableDescription) { + LOG.debug("Closing the " + closableDescription); + + if (closeable != null) { + try { + closeable.close(); + } catch (Throwable t) { + LOG.warn("Error closing the {}: {}", closableDescription, t.getMessage(), t); + } + } + } + + public void start() throws Exception { + LOG.debug("Starting the RabbitMQ client"); + + try { + LOG.debug("Creating the connection"); + connection = factory.newConnection(); + LOG.debug("Connection created successfully"); + + LOG.debug("Creating the Channel"); + channel = connection.createChannel(); + LOG.debug("Channel created successfully"); + } catch (Throwable t) { + LOG.trace("Something wrong happened while initializing the RabbitMQ client: {}", t.getMessage(), t); + + capturingClose(connection, "connection"); + throw t; + } + } + + public void stop() { + try { + LOG.debug("Stopping the channel"); + capturingClose(channel, "channel"); + + LOG.debug("Stopping the RabbitMQ connection"); + capturingClose(connection, "connection"); + } finally { + channel = null; + connection = null; + } + } + + public AMQP.Queue.DeclareOk createQueue(final String queueName) { + try { + start(); + return channel.queueDeclare(queueName, true, false, false, null); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + + // unreachable + return null; + } finally { + stop(); + } + } + + public AMQP.Exchange.DeclareOk createExchange(final String exchangeName) { + return createExchange(exchangeName, DEFAULT_EXCHANGE_TYPE); + } + + public AMQP.Exchange.DeclareOk createExchange(final String exchangeName, final String exchangeType) { + try { + start(); + return channel.exchangeDeclare(exchangeName, exchangeType); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + + // unreachable + return null; + } finally { + stop(); + } + } + + public AMQP.Queue.BindOk bindExchangeToQueue(final String exchangeName, final String queueName) { + try { + start(); + return channel.queueBind(exchangeName, exchangeName, ""); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + + // unreachable + return null; + } finally { + stop(); + } + } + + /** + * Sends data to a RabbitMQ queue + * + * @param queue the queue to send data to + * @param data the (string) data to send + * @throws IOException + */ + public void send(final String queue, final String data) { + try { + start(); + channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, data.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + } finally { + stop(); + } + } + + /** + * Receives data from a JMS queue or topic + * + * @param queue the queue or topic to receive data from + * @param deliverCallback the callback used to test each received messages + */ + public void receive(final String queue, DeliverCallback deliverCallback) throws Exception { + channel.basicConsume(queue, true, deliverCallback, consumerTag -> { }); + } +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java new file mode 100644 index 0000000..15b7f8d --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.rabbitmq.services; + +public interface ConnectionProperties { + String username(); + String password(); + String hostname(); + int port(); +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java new file mode 100644 index 0000000..ee7a827 --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.rabbitmq.services; + +import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.RabbitMQContainer; + +public class RabbitMQLocalContainerService implements RabbitMQService { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQLocalContainerService.class); + + private final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.8-management"); + + public RabbitMQLocalContainerService() { + container.start(); + } + + @Override + public ConnectionProperties connectionProperties() { + return new ConnectionProperties() { + @Override + public String username() { + return container.getAdminUsername(); + } + + @Override + public String password() { + return container.getAdminPassword(); + } + + @Override + public String hostname() { + return container.getHost(); + } + + @Override + public int port() { + return container.getAmqpPort(); + } + }; + } + + @Override + public RabbitMQClient getClient() { + return new RabbitMQClient(container.getAmqpUrl()); + } + + @Override + public void initialize() { + LOG.info("RabbitMQ container running on {}", container.getAmqpUrl()); + } + + @Override + public void shutdown() { + container.stop(); + } +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java new file mode 100644 index 0000000..74d2a48 --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.rabbitmq.services; + +import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; + +public class RabbitMQRemoteService implements RabbitMQService { + + + @Override + public ConnectionProperties connectionProperties() { + return null; + } + + @Override + public RabbitMQClient getClient() { + return null; + } + + @Override + public void initialize() { + + } + + @Override + public void shutdown() { + + } +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java new file mode 100644 index 0000000..d026ba2 --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.rabbitmq.services; + +import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public interface RabbitMQService extends BeforeAllCallback, AfterAllCallback { + + + /** + * The connection properties for the service + * @return + */ + ConnectionProperties connectionProperties(); + + /** + * Get the appropriate client for the service + * @return + */ + RabbitMQClient getClient(); + + /** + * Perform any initialization necessary + */ + void initialize(); + + /** + * Shuts down the service after the test has completed + */ + void shutdown(); + + + @Override + default void afterAll(ExtensionContext extensionContext) throws Exception { + shutdown(); + } + + @Override + default void beforeAll(ExtensionContext extensionContext) throws Exception { + initialize(); + } +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java new file mode 100644 index 0000000..50013dd --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.rabbitmq.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RabbitMQServiceFactory { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQServiceFactory.class); + + private RabbitMQServiceFactory() { + + } + + public static RabbitMQService createService() { + String instanceType = System.getProperty("rabbitmq.instance.type"); + + if (instanceType == null || instanceType.equals("local-rabbitmq-container")) { + return new RabbitMQLocalContainerService(); + } + + if (instanceType.equals("remote")) { + return new RabbitMQRemoteService(); + } + + LOG.error("rabbit-mq instance must be one of 'local-rabbitmq-container' or 'remote"); + throw new UnsupportedOperationException(String.format("Invalid rabbitmq instance type: %s", instanceType)); + + } +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java new file mode 100644 index 0000000..0c8e467 --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.rabbitmq.sink; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +public class CamelRabbitMQPropertyFactory extends SinkConnectorPropertyFactory<CamelRabbitMQPropertyFactory> { + public CamelRabbitMQPropertyFactory withHostname(String value) { + return setProperty("camel.component.rabbitmq.hostname", value); + } + + public CamelRabbitMQPropertyFactory withPortNumber(int value) { + return setProperty("camel.component.rabbitmq.portNumber", value); + } + + public CamelRabbitMQPropertyFactory withUsername(String value) { + return setProperty("camel.component.rabbitmq.username", value); + } + + public CamelRabbitMQPropertyFactory withPassword(String value) { + return setProperty("camel.component.rabbitmq.password", value); + } + + public CamelRabbitMQPropertyFactory withExchangeName(String value) { + return setProperty("camel.source.path.exchangeName", value); + } + + public CamelRabbitMQPropertyFactory withExchangeType(String value) { + return setProperty("camel.source.endpoint.exchangeType", value); + } + + public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) { + return setProperty("camel.source.endpoint.autoDelete", value); + } + + public CamelRabbitMQPropertyFactory withQueue(String value) { + return setProperty("camel.source.endpoint.queue", value); + } + + public CamelRabbitMQPropertyFactory withRoutingKey(String value) { + return setProperty("camel.source.endpoint.routingKey", value); + } + + public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String exchangeName) { + String sourceUrl = String.format("rabbitmq://%s", exchangeName); + + return new EndpointUrlBuilder<>(this::withSinkUrl, sourceUrl); + } + + public static CamelRabbitMQPropertyFactory basic() { + return new CamelRabbitMQPropertyFactory() + .withTasksMax(1) + .withName("CamelRabbitmqSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter"); + + } + +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java new file mode 100644 index 0000000..80b1606 --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.rabbitmq.sink; + +import java.io.UnsupportedEncodingException; +import java.util.concurrent.CountDownLatch; + +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Delivery; +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; +import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService; +import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@Testcontainers +public class RabbitMQSinkITCase extends AbstractKafkaTest { + @RegisterExtension + public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService(); + + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkITCase.class); + private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import"; + + private RabbitMQClient rabbitMQClient; + private int received; + private final int expect = 10; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-rabbitmq-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + received = 0; + rabbitMQClient = rabbitmqService.getClient(); + } + + private boolean checkRecord(Delivery rabbitMQDelivery) { + try { + String message = new String(rabbitMQDelivery.getBody(), "UTF-8"); + LOG.debug("Received: {}", message); + + received++; + + if (received == expect) { + return false; + } + + return true; + } catch (UnsupportedEncodingException e) { + LOG.error("Failed to read message: {}", e.getMessage(), e); + fail("Failed to read message: " + e.getMessage()); + return false; + } + } + + private void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + CountDownLatch latch = new CountDownLatch(1); + + LOG.debug("Creating the consumer ..."); + rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE); + try { + rabbitMQClient.start(); + consumeRabbitMQMessages(latch); + + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < expect; i++) { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); + } + + LOG.debug("Created the consumer ... About to receive messages"); + + latch.await(); + assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); + } finally { + rabbitMQClient.stop(); + } + } + + @Test + @Timeout(90) + public void testSource() throws Exception { + ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory + .basic() + .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withUrl("") + .append("username", rabbitmqService.connectionProperties().username()) + .append("password", rabbitmqService.connectionProperties().password()) + .append("autoDelete", "false") + .append("queue", DEFAULT_RABBITMQ_QUEUE) + .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE) + .append("skipExchangeDeclare", "true") + .append("skipQueueBind", "true") + .append("hostname", rabbitmqService.connectionProperties().hostname()) + .append("portNumber", rabbitmqService.connectionProperties().port()) + .buildUrl(); + + runBasicStringTest(factory); + } + + private void consumeRabbitMQMessages(CountDownLatch latch) { + DeliverCallback deliveryCallback = (consumerTag, delivery) -> { + if (!this.checkRecord(delivery)) { + latch.countDown(); + } + }; + try { + rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback); + } catch (Exception e) { + LOG.error("RabbitMQ test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java new file mode 100644 index 0000000..1548b2e --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.rabbitmq.source; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + +public class CamelRabbitMQPropertyFactory extends SourceConnectorPropertyFactory<CamelRabbitMQPropertyFactory> { + public CamelRabbitMQPropertyFactory withHostname(String value) { + return setProperty("camel.component.rabbitmq.hostname", value); + } + + public CamelRabbitMQPropertyFactory withPortNumber(int value) { + return setProperty("camel.component.rabbitmq.portNumber", value); + } + + public CamelRabbitMQPropertyFactory withUsername(String value) { + return setProperty("camel.component.rabbitmq.username", value); + } + + public CamelRabbitMQPropertyFactory withPassword(String value) { + return setProperty("camel.component.rabbitmq.password", value); + } + + public CamelRabbitMQPropertyFactory withExchangeName(String value) { + return setProperty("camel.source.path.exchangeName", value); + } + + public CamelRabbitMQPropertyFactory withExchangeType(String value) { + return setProperty("camel.source.endpoint.exchangeType", value); + } + + public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) { + return setProperty("camel.source.endpoint.autoDelete", value); + } + + public CamelRabbitMQPropertyFactory withQueue(String value) { + return setProperty("camel.source.endpoint.queue", value); + } + + public CamelRabbitMQPropertyFactory withRoutingKey(String value) { + return setProperty("camel.source.endpoint.routingKey", value); + } + + public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String exchangeName) { + String sourceUrl = String.format("rabbitmq://%s", exchangeName); + + return new EndpointUrlBuilder<>(this::withSourceUrl, sourceUrl); + } + + public static CamelRabbitMQPropertyFactory basic() { + return new CamelRabbitMQPropertyFactory() + .withTasksMax(1) + .withName("CamelRabbitmqSourceConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter"); + + } + +} diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java new file mode 100644 index 0000000..73a75e3 --- /dev/null +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.rabbitmq.source; + +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; +import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService; +import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Testcontainers +public class RabbitMQSourceITCase extends AbstractKafkaTest { + @RegisterExtension + public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService(); + + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceITCase.class); + private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import"; + + private RabbitMQClient rabbitMQClient; + private int received; + private final int expect = 10; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-rabbitmq-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + received = 0; + rabbitMQClient = rabbitmqService.getClient(); + } + + private <T> boolean checkRecord(ConsumerRecord<String, T> record) { + LOG.debug("Received: {}", record.value()); + received++; + + if (received == expect) { + return false; + } + + return true; + } + + public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE); + + for (int i = 0; i < expect; i++) { + rabbitMQClient.send(DEFAULT_RABBITMQ_QUEUE, "Test string message"); + } + + LOG.debug("Creating the kafka consumer ..."); + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); + LOG.debug("Created the kafka consumer ..."); + + assertEquals(received, expect, "Didn't process the expected amount of messages"); + } + + @Test + @Timeout(90) + public void testSource() throws ExecutionException, InterruptedException { + ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory + .basic() + .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withUrl("") + .append("username", rabbitmqService.connectionProperties().username()) + .append("password", rabbitmqService.connectionProperties().password()) + .append("autoDelete", "false") + .append("queue", DEFAULT_RABBITMQ_QUEUE) + .append("skipExchangeDeclare", "true") + .append("skipQueueBind", "true") + .append("hostname", rabbitmqService.connectionProperties().hostname()) + .append("portNumber", rabbitmqService.connectionProperties().port()) + .buildUrl(); + + runBasicStringTest(factory); + } +} diff --git a/tests/pom.xml b/tests/pom.xml index d52caed..c95de51 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -59,6 +59,7 @@ <module>itests-azure-common</module> <module>itests-azure-storage-queue</module> <module>perf-tests-rabbitmq</module> + <module>itests-rabbitmq</module> </modules>
