This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 99f11dfe4005544b12d541ba553dee9f134e9ad6
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Sat Mar 6 02:44:30 2021 +0100

    Fixed itest for netty-http.
---
 .../nettyhttp/sink/CamelSinkNettyhttpITCase.java   | 112 +++++++++++++++++++++
 .../source/CamelSourceNettyHTTPITCase.java         |   2 +-
 tests/pom.xml                                      |   1 -
 3 files changed, 113 insertions(+), 2 deletions(-)

diff --git 
a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
 
b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
new file mode 100644
index 0000000..96bd27a
--- /dev/null
+++ 
b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.nettyhttp.sink;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class);
+
+    private MockWebServer mockServer;
+
+    private String topicName;
+
+    private final int expect = 1;
+    private volatile RecordedRequest received;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-netty-http-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
+        mockServer = new MockWebServer();
+        received = null;
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (mockServer != null) {
+            mockServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            received = mockServer.takeRequest();
+        } catch (InterruptedException e) {
+            LOG.error("Unable to receive messages: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        String expected = "Sink test message 0";
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals("/test", received.getPath(), "Received path 
differed");
+            assertEquals(expected, received.getBody().readUtf8(), "Received 
message content differed");
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelNettyhttpPropertyFactory.basic()
+                .withTopics(topicName)
+                .withProtocol("http")
+                .withHost(mockServer.getHostName())
+                .withPort(mockServer.getPort())
+                .withPath("test");
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelNettyhttpPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl("http", mockServer.getHostName(), 
mockServer.getPort(), "test")
+                .buildUrl();
+        mockServer.enqueue(new MockResponse().setResponseCode(200));
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git 
a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
 
b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
index 41cb6e1..48bcb59 100644
--- 
a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
+++ 
b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport {
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class);
-    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
+    private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 
30000, 40000);
     private static final String TEST_MESSAGE = "testMessage";
 
     private String topicName;
diff --git a/tests/pom.xml b/tests/pom.xml
index abae69b..4009424 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -53,7 +53,6 @@
         <module>itests-salesforce</module>
         <module>itests-hdfs</module>
         <module>itests-mongodb</module>
-        <module>itests-netty-http</module>
         <module>itests-jdbc</module>
         <module>itests-azure-storage-blob</module>
         <module>itests-azure-storage-queue</module>

Reply via email to