This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 99f11dfe4005544b12d541ba553dee9f134e9ad6 Author: Andrea Tarocchi <[email protected]> AuthorDate: Sat Mar 6 02:44:30 2021 +0100 Fixed itest for netty-http. --- .../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 112 +++++++++++++++++++++ .../source/CamelSourceNettyHTTPITCase.java | 2 +- tests/pom.xml | 1 - 3 files changed, 113 insertions(+), 2 deletions(-) diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java new file mode 100644 index 0000000..96bd27a --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.nettyhttp.sink; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class); + + private MockWebServer mockServer; + + private String topicName; + + private final int expect = 1; + private volatile RecordedRequest received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-netty-http-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + topicName = getTopicForTest(this); + mockServer = new MockWebServer(); + received = null; + } + + @AfterEach + public void tearDown() throws Exception { + if (mockServer != null) { + mockServer.shutdown(); + } + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + received = mockServer.takeRequest(); + } catch (InterruptedException e) { + LOG.error("Unable to receive messages: {}", e.getMessage(), e); + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + String expected = "Sink test message 0"; + if (latch.await(30, TimeUnit.SECONDS)) { + assertEquals("/test", received.getPath(), "Received path differed"); + assertEquals(expected, received.getBody().readUtf8(), "Received message content differed"); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + + @Test + @Timeout(30) + public void testBasicSendReceive() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() + .withTopics(topicName) + .withProtocol("http") + .withHost(mockServer.getHostName()) + .withPort(mockServer.getPort()) + .withPath("test"); + mockServer.enqueue(new MockResponse().setResponseCode(200)); + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(30) + public void testBasicSendReceiveUsingUrl() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() + .withTopics(topicName) + .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test") + .buildUrl(); + mockServer.enqueue(new MockResponse().setResponseCode(200)); + runTest(connectorPropertyFactory, topicName, expect); + } +} diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java index 41cb6e1..48bcb59 100644 --- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java @@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport { private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class); - private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost"); + private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000); private static final String TEST_MESSAGE = "testMessage"; private String topicName; diff --git a/tests/pom.xml b/tests/pom.xml index abae69b..4009424 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -53,7 +53,6 @@ <module>itests-salesforce</module> <module>itests-hdfs</module> <module>itests-mongodb</module> - <module>itests-netty-http</module> <module>itests-jdbc</module> <module>itests-azure-storage-blob</module> <module>itests-azure-storage-queue</module>
