This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-kafka-connector-0.7.x in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 9825019ca3b70670c0c6ccd46baab12b71af32c8 Author: Andrea Tarocchi <[email protected]> AuthorDate: Fri Mar 5 22:17:03 2021 +0100 fix #969 : Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string. --- tests/itests-netty-http/pom.xml | 9 ++ .../source/CamelNettyHTTPPropertyFactory.java | 60 ++++++++++ .../source/CamelSourceNettyHTTPITCase.java | 123 +++++++++++++++++++++ tests/pom.xml | 3 +- 4 files changed, 194 insertions(+), 1 deletion(-) diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml index 16238cf..353c75e 100644 --- a/tests/itests-netty-http/pom.xml +++ b/tests/itests-netty-http/pom.xml @@ -37,6 +37,15 @@ <scope>test</scope> </dependency> + <!-- test infra --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-common</artifactId> + <version>${camel.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-netty-http</artifactId> diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java new file mode 100644 index 0000000..e4df820 --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyHTTPPropertyFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.nettyhttp.source; + +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + +final class CamelNettyHTTPPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyHTTPPropertyFactory> { + private CamelNettyHTTPPropertyFactory() { + + } + + public CamelNettyHTTPPropertyFactory withHost(String host) { + return setProperty("camel.source.path.host", host); + } + + public CamelNettyHTTPPropertyFactory withProtocol(String protocol) { + return setProperty("camel.source.path.protocol", protocol); + } + + public CamelNettyHTTPPropertyFactory withPort(int port) { + return setProperty("camel.source.path.port", String.valueOf(port)); + } + + public CamelNettyHTTPPropertyFactory withSync(boolean sync) { + return setProperty("camel.source.endpoint.sync", String.valueOf(sync)); + } + + public CamelNettyHTTPPropertyFactory withReceiveBufferSize(int size) { + return setProperty("camel.source.endpoint.receiveBufferSize", String.valueOf(size)); + } + + public CamelNettyHTTPPropertyFactory withCamelTypeConverterTransformTo(String targetClass) { + setProperty("transforms", "cameltypeconverter"); + setProperty("transforms.cameltypeconverter.type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value"); + return setProperty("transforms.cameltypeconverter.target.type", targetClass); + } + + public static CamelNettyHTTPPropertyFactory basic() { + return new CamelNettyHTTPPropertyFactory() + .withTasksMax(1) + .withName("CamelNettyHttpSourceConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java new file mode 100644 index 0000000..41cb6e1 --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.nettyhttp.source; + +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class); + private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost"); + private static final String TEST_MESSAGE = "testMessage"; + + private String topicName; + + private final int expect = 1; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-netty-http-kafka-connector"}; + } + + @BeforeEach + public void setUp() throws IOException { + topicName = getTopicForTest(this); + } + + @AfterEach + public void tearDown() {} + + @Test + @Timeout(90) + public void testBasicSendReceive() throws Exception { + + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic() + .withKafkaTopic(topicName) + .withReceiveBufferSize(10) + .withHost("0.0.0.0") + .withPort(HTTP_PORT) + .withProtocol("http") + .withCamelTypeConverterTransformTo("java.lang.String"); + + runTestBlocking(connectorPropertyFactory, topicName, expect); + } + + @Override + protected void produceTestData() { + int retriesLeft = 10; + boolean success = false; + while(retriesLeft > 0 && !success) { + try (final CloseableHttpClient httpclient = HttpClients.createDefault()) { + + byte[] ipAddr = new byte[]{127, 0, 0, 1}; + InetAddress localhost = InetAddress.getByAddress(ipAddr); + final HttpPost httpPost = new HttpPost("http://" + localhost.getHostAddress() + ":" + HTTP_PORT); + + LOG.info("Executing request {} {}", httpPost.getMethod(), httpPost.getURI()); + + httpPost.setEntity(new StringEntity(TEST_MESSAGE)); + + CloseableHttpResponse response = httpclient.execute(httpPost); + assertEquals(200, response.getStatusLine().getStatusCode()); + response.close(); + httpPost.releaseConnection(); + success = true; + LOG.info("Request success at {} attempt.", retriesLeft); + } catch (IOException e) { + if(retriesLeft == 1) { + e.printStackTrace(); + fail("There should be no exceptions in sending the http test message."); + } else { + retriesLeft--; + try { + Thread.sleep(100); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + } + } + } + } + + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); + assertEquals(expect, received, "Didn't process the expected amount of messages"); + assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString()); + } +} diff --git a/tests/pom.xml b/tests/pom.xml index 37f2cf0..c735d3b 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -55,6 +55,7 @@ <module>itests-salesforce</module> <module>itests-hdfs</module> <module>itests-mongodb</module> + <module>itests-netty-http</module> <module>itests-jdbc</module> <module>itests-azure-common</module> <module>itests-azure-storage-blob</module> @@ -63,7 +64,7 @@ <module>itests-rabbitmq</module> <module>itests-couchbase</module> <module>itests-ssh</module> - <module>itests-sql</module> +g <module>itests-sql</module> <module>itests-netty-http</module> </modules>
