This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/main by this push:
new 1ab8aac Adjusted test code to avoid recycling test topics
1ab8aac is described below
commit 1ab8aac781518d5ac426bf5ff472c2af31c953c9
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Jul 12 15:42:29 2021 +0200
Adjusted test code to avoid recycling test topics
---
.../aws/v2/cw/sink/CamelSinkAWSCWITCase.java | 3 +-
.../aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java | 3 +-
.../aws/v2/iam/sink/CamelSinkAWSIAMITCase.java | 3 +-
.../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java | 3 +-
.../aws/v2/kms/sink/CamelSinkAWSKMSITCase.java | 3 +-
.../aws/v2/lambda/sink/CamelSinkLambdaITCase.java | 3 +-
.../aws/v2/s3/sink/CamelSinkAWSS3ITCase.java | 3 +-
.../source/CamelSourceAWSS3LargeFilesITCase.java | 3 +-
.../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 7 ++---
.../kafkaconnector/common/AbstractKafkaTest.java | 12 ++++++--
.../common/utils/CamelKafkaConnectorTestUtils.java | 35 ----------------------
.../jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java | 12 ++++++--
.../salesforce/sink/CamelSinkSalesforceITCase.java | 7 +++--
.../source/CamelSourceSalesforceITCase.java | 13 ++++----
.../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 3 +-
.../sjms2/sink/CamelSinkJMSStartupITCase.java | 12 ++++++--
.../sjms2/sink/CamelSinkWithDLQJMSITCase.java | 7 +++--
.../slack/sink/CamelSinkSlackITCase.java | 14 ++++++---
.../slack/source/CamelSourceSlackITCase.java | 3 +-
.../ssh/sink/CamelSinkSshITCase.java | 3 +-
.../source/RabbitMQSourcePerformanceITCase.java | 3 +-
21 files changed, 68 insertions(+), 87 deletions(-)
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
index c0fce3a..e862464 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -27,7 +27,6 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSLocalContainerService;
@@ -160,7 +159,7 @@ public class CamelSinkAWSCWITCase extends
CamelSinkTestSupport {
@Timeout(value = 120)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
.basic()
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index a8f7a1b..db02c0e 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -29,7 +29,6 @@ import
org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguratio
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -138,7 +137,7 @@ public class CamelSinkAWSEC2ITCase extends
CamelSinkTestSupport {
@Timeout(90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory
.basic()
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
index d524168..2764969 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
@@ -29,7 +29,6 @@ import
org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguratio
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -131,7 +130,7 @@ public class CamelSinkAWSIAMITCase extends
CamelSinkTestSupport {
@Timeout(90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory
.basic()
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
index 61f3daa..1f3ecda 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -30,7 +30,6 @@ import
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfigur
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -141,7 +140,7 @@ public class CamelSinkAWSKinesisITCase extends
CamelSinkTestSupport {
@Timeout(120)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSKinesisPropertyFactory
.basic()
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
index c88e8ef..3aef0c3 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
@@ -28,7 +28,6 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -144,7 +143,7 @@ public class CamelSinkAWSKMSITCase extends
CamelSinkTestSupport {
@Timeout(120)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSKMSPropertyFactory
.basic()
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
index 1af40aa..309fbca 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
@@ -36,7 +36,6 @@ import
org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import
org.apache.camel.kafkaconnector.common.clients.kafka.ProducerPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
@@ -174,7 +173,7 @@ public class CamelSinkLambdaITCase extends
CamelSinkTestSupport {
@Timeout(90)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSLambdaPropertyFactory
.basic()
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
index e489c63..565aceb 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
@@ -30,7 +30,6 @@ import
org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -151,7 +150,7 @@ public class CamelSinkAWSS3ITCase extends
CamelSinkTestSupport {
@Timeout(180)
public void testBasicSendReceive() throws Exception {
Properties amazonProperties = service.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties = CamelAWSS3PropertyFactory
.basic()
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
index e44d61b..f8ae34e 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
@@ -27,7 +27,6 @@ import
org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -94,7 +93,7 @@ public class CamelSourceAWSS3LargeFilesITCase extends
CamelSourceTestSupport {
@BeforeEach
public void setUp() {
- topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ topicName = getTopicForTest(this);
awsS3Client = AWSSDKClientUtils.newS3Client();
bucketName = AWSCommon.DEFAULT_S3_BUCKET +
TestUtils.randomWithRange(0, 100);
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index fddb565..f688611 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -129,7 +128,7 @@ public class CamelSinkAWSSQSITCase extends
CamelSinkTestSupport {
public void testBasicSendReceive() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties =
CamelAWSSQSPropertyFactory
.basic()
@@ -153,7 +152,7 @@ public class CamelSinkAWSSQSITCase extends
CamelSinkTestSupport {
public void testBasicSendReceiveUsingKafkaStyle() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties =
CamelAWSSQSPropertyFactory
.basic()
@@ -178,7 +177,7 @@ public class CamelSinkAWSSQSITCase extends
CamelSinkTestSupport {
public void testBasicSendReceiveUsingUrl() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
- String topicName =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String topicName = getTopicForTest(this);
ConnectorPropertyFactory testProperties =
CamelAWSSQSPropertyFactory
.basic()
diff --git
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
index 5f6525f..8b0b155 100644
---
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
+++
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
@@ -20,7 +20,6 @@ package org.apache.camel.kafkaconnector.common;
import
org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
import
org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
import
org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
import org.apache.camel.test.infra.common.TestUtils;
import org.apache.camel.test.infra.kafka.services.ContainerLocalKafkaService;
@@ -68,7 +67,16 @@ public abstract class AbstractKafkaTest {
return kafkaConnectService;
}
+ /**
+ * Gets a topic name for the test class
+ * @param clazz
+ * @return
+ */
+ protected String getDefaultTestTopic(Class<?> clazz) {
+ return clazz.getName();
+ }
+
protected String getTopicForTest(Object testObject) {
- return
CamelKafkaConnectorTestUtils.getDefaultTestTopic(testObject.getClass()) + "." +
TestUtils.randomWithRange(0, 1000);
+ return getDefaultTestTopic(testObject.getClass()) + "." +
TestUtils.randomWithRange(0, 1000);
}
}
diff --git
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/CamelKafkaConnectorTestUtils.java
b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/CamelKafkaConnectorTestUtils.java
deleted file mode 100644
index b3a3269..0000000
---
a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/utils/CamelKafkaConnectorTestUtils.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.kafkaconnector.common.utils;
-
-/**
- * Test utilities
- */
-public final class CamelKafkaConnectorTestUtils {
- private CamelKafkaConnectorTestUtils() {
- }
-
-
- /**
- * Gets a topic name for the test class
- * @param clazz
- * @return
- */
- public static String getDefaultTestTopic(Class<?> clazz) {
- return clazz.getName();
- }
-}
diff --git
a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java
b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java
index ae6f11a..3100c5d 100644
---
a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java
+++
b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCNoDataSourceITCase.java
@@ -31,11 +31,11 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
import org.apache.camel.test.infra.common.TestUtils;
import org.apache.camel.test.infra.jdbc.services.JDBCService;
import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -57,6 +57,7 @@ public class CamelSinkJDBCNoDataSourceITCase extends
AbstractKafkaTest {
private final int expect = 10;
private int received;
+ private String topicName;
static {
final String postgresImage = "postgres:9.6.2";
@@ -74,6 +75,11 @@ public class CamelSinkJDBCNoDataSourceITCase extends
AbstractKafkaTest {
.build();
}
+ @BeforeEach
+ void setUp() {
+ topicName = getTopicForTest(this);
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-jdbc-kafka-connector"};
@@ -92,7 +98,7 @@ public class CamelSinkJDBCNoDataSourceITCase extends
AbstractKafkaTest {
jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"TestData", "test data " + i);
try {
-
kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()),
body, jdbcParameters);
+ kafkaClient.produce(topicName, body, jdbcParameters);
} catch (ExecutionException e) {
LOG.error("Unable to produce messages: {}",
e.getMessage(), e);
} catch (InterruptedException e) {
@@ -165,7 +171,7 @@ public class CamelSinkJDBCNoDataSourceITCase extends
AbstractKafkaTest {
.end()
.withDataSourceName("anotherName")
.withUseHeaderAsParameters(true)
-
.withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()));
+ .withTopics(topicName);
runTest(factory);
diff --git
a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
index 9919573..652877e 100644
---
a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
+++
b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import
org.apache.camel.kafkaconnector.salesforce.clients.SalesforceCliContainer;
import org.apache.camel.kafkaconnector.salesforce.clients.SfdxCommand;
import org.apache.camel.test.infra.common.TestUtils;
@@ -96,6 +95,7 @@ public class CamelSinkSalesforceITCase extends
AbstractKafkaTest {
private String accountName;
private boolean recordCreated;
+ private String topicName;
@Override
protected String[] getConnectorsInTest() {
@@ -105,6 +105,7 @@ public class CamelSinkSalesforceITCase extends
AbstractKafkaTest {
@BeforeEach
public void setUp() {
accountName = "TestSinkAccount" + TestUtils.randomWithRange(1, 100);
+ topicName = getTopicForTest(this);
}
@AfterEach
@@ -166,14 +167,14 @@ public class CamelSinkSalesforceITCase extends
AbstractKafkaTest {
LOG.info("Sending new account {}", data);
-
kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()),
data);
+ kafkaClient.produce(topicName, data);
}
@Test
@Timeout(180)
public void testBasicProduce() throws ExecutionException,
InterruptedException {
ConnectorPropertyFactory factory =
CamelSalesforcePropertyFactory.basic()
-
.withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
diff --git
a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
index bc2bd0f..00738be 100644
---
a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
+++
b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import
org.apache.camel.kafkaconnector.salesforce.clients.SalesforceCliContainer;
import org.apache.camel.kafkaconnector.salesforce.clients.SfdxCommand;
import org.apache.camel.test.infra.common.TestUtils;
@@ -98,6 +97,7 @@ public class CamelSourceSalesforceITCase extends
AbstractKafkaTest {
private volatile boolean received;
private String account;
+ private String topicName;
@Override
protected String[] getConnectorsInTest() {
@@ -109,6 +109,7 @@ public class CamelSourceSalesforceITCase extends
AbstractKafkaTest {
received = false;
account = "TestAccount" + TestUtils.randomWithRange(1, 100);
+ topicName = getTopicForTest(this);
SfdxCommand sfdxCommand = SfdxCommand.forceDataRecordCreate()
.withArgument("-u", userName)
@@ -150,7 +151,7 @@ public class CamelSourceSalesforceITCase extends
AbstractKafkaTest {
LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
-
kafkaClient.consume(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()),
this::checkRecord);
+ kafkaClient.consume(topicName, this::checkRecord);
LOG.debug("Created the consumer ...");
assertTrue(received, "Didn't receive any messages");
@@ -205,7 +206,7 @@ public class CamelSourceSalesforceITCase extends
AbstractKafkaTest {
@Timeout(180)
public void testBasicConsume() throws ExecutionException,
InterruptedException {
ConnectorPropertyFactory factory =
CamelSalesforcePropertyFactory.basic()
-
.withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
@@ -228,7 +229,7 @@ public class CamelSourceSalesforceITCase extends
AbstractKafkaTest {
@Timeout(180)
public void testBasicConsumeUsingUrl() throws ExecutionException,
InterruptedException {
ConnectorPropertyFactory factory =
CamelSalesforcePropertyFactory.basic()
-
.withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
@@ -271,7 +272,7 @@ public class CamelSourceSalesforceITCase extends
AbstractKafkaTest {
* HTTP error 500 without much details.
*/
ConnectorPropertyFactory factory =
CamelSalesforcePropertyFactory.basic()
-
.withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
@@ -289,7 +290,7 @@ public class CamelSourceSalesforceITCase extends
AbstractKafkaTest {
@Timeout(180)
public void testBasicCDCUsingUrl() throws ExecutionException,
InterruptedException {
ConnectorPropertyFactory factory =
CamelSalesforcePropertyFactory.basic()
-
.withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUserName(userName)
.withPassword(password)
.withClientId(clientId)
diff --git
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index c363693..a1ce08b 100644
---
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -32,7 +32,6 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.camel.test.infra.common.TestUtils;
@@ -86,7 +85,7 @@ public class CamelSinkIdempotentJMSITCase extends
CamelSinkTestSupport {
LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
received = 0;
- topic =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()) +
TestUtils.randomWithRange(0, 100);
+ topic = getTopicForTest(this);
destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" +
TestUtils.randomWithRange(0, 100);
}
diff --git
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
index 25fe759..967f8d9 100644
---
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
+++
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
@@ -24,9 +24,9 @@ import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
@@ -45,6 +45,7 @@ public class CamelSinkJMSStartupITCase extends
AbstractKafkaTest {
private boolean running;
private String trace;
+ private String topicName;
private Properties connectionProperties() {
@@ -56,6 +57,11 @@ public class CamelSinkJMSStartupITCase extends
AbstractKafkaTest {
return properties;
}
+ @BeforeEach
+ void setUp() {
+ topicName = getTopicForTest(this);
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-sjms2-kafka-connector"};
@@ -82,7 +88,7 @@ public class CamelSinkJMSStartupITCase extends
AbstractKafkaTest {
KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
-
kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()),
"Sink test message ");
+ kafkaClient.produce(topicName, "Sink test message ");
}
private void checkThatFailed() throws InterruptedException {
@@ -110,7 +116,7 @@ public class CamelSinkJMSStartupITCase extends
AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory =
CamelJMSPropertyFactory
.basic()
-
.withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withConnectionProperties(brokenProp)
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
.withDeadLetterQueueTopicName("dlq-sink-topic");
diff --git
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
index 583957b..5878e1d 100644
---
a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
+++
b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
@@ -47,6 +46,7 @@ public class CamelSinkWithDLQJMSITCase extends
AbstractKafkaTest {
private final int expect = 10;
private int errors;
private final int expectedErrors = 1;
+ private String topicName;
private Properties connectionProperties() {
Properties properties = new Properties();
@@ -65,6 +65,7 @@ public class CamelSinkWithDLQJMSITCase extends
AbstractKafkaTest {
@BeforeEach
public void setUp() {
errors = 0;
+ topicName = getTopicForTest(this);
}
private <T> boolean checkDqlRecord(ConsumerRecord<String, T> record) {
@@ -88,7 +89,7 @@ public class CamelSinkWithDLQJMSITCase extends
AbstractKafkaTest {
KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
for (int i = 0; i < expect; i++) {
-
kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()),
"Sink test message " + i);
+ kafkaClient.produce(topicName, "Sink test message " + i);
}
LOG.debug("Created the consumer ... About to receive messages");
@@ -105,7 +106,7 @@ public class CamelSinkWithDLQJMSITCase extends
AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory =
CamelJMSPropertyFactory
.basic()
-
.withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withConnectionProperties(brokenProp)
.withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
.withDeadLetterQueueTopicName("dlq-sink-topic");
diff --git
a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
index 013fec4..b9ec30d 100644
---
a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
+++
b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
@@ -46,6 +46,12 @@ public class CamelSinkSlackITCase extends AbstractKafkaTest {
private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkSlackITCase.class);
private String slackChannel = System.getProperty("it.test.slack.channel");
private String webhookUrl = System.getProperty("it.test.slack.webhookUrl");
+ private String topicName;
+
+ @BeforeEach
+ void setUp() {
+ topicName = getTopicForTest(this);
+ }
@Override
protected String[] getConnectorsInTest() {
@@ -58,7 +64,7 @@ public class CamelSinkSlackITCase extends AbstractKafkaTest {
KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
-
kafkaClient.produce(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()),
message);
+ kafkaClient.produce(topicName, message);
LOG.debug("Created the consumer ... About to receive messages");
@@ -70,7 +76,7 @@ public class CamelSinkSlackITCase extends AbstractKafkaTest {
try {
ConnectorPropertyFactory connectorPropertyFactory =
CamelSlackPropertyFactory
.basic()
-
.withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withChannel(slackChannel)
.withWebhookUrl(webhookUrl);
@@ -88,7 +94,7 @@ public class CamelSinkSlackITCase extends AbstractKafkaTest {
try {
ConnectorPropertyFactory connectorPropertyFactory =
CamelSlackPropertyFactory
.basic()
-
.withTopics(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withUrl(slackChannel)
.append("webhookUrl", webhookUrl)
.buildUrl();
diff --git
a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java
b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java
index 579bc45..0611216 100644
---
a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java
+++
b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/source/CamelSourceSlackITCase.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -77,7 +76,7 @@ public class CamelSourceSlackITCase extends AbstractKafkaTest
{
@Test
@Timeout(90)
public void testBasicSendReceive() throws ExecutionException,
InterruptedException {
- String kafkaTopic =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ String kafkaTopic = getTopicForTest(this);
ConnectorPropertyFactory factory = CamelSlackPropertyFactory
.basic()
.withKafkaTopic(kafkaTopic)
diff --git
a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 50c5483..03ee0f3 100644
---
a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++
b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.kafkaconnector.ssh.services.SshService;
import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
import org.junit.jupiter.api.BeforeEach;
@@ -67,7 +66,7 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
@BeforeEach
public void setUp() {
- topic =
CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass());
+ topic = getTopicForTest(this);
}
diff --git
a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
index d5dfb15..4c232d7 100644
---
a/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
+++
b/tests/perf-tests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourcePerformanceITCase.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import
org.apache.camel.kafkaconnector.common.utils.CamelKafkaConnectorTestUtils;
import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService;
import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory;
import org.junit.jupiter.api.BeforeAll;
@@ -51,7 +50,7 @@ public class RabbitMQSourcePerformanceITCase extends
AbstractKafkaTest {
public void testMemory() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
.basic()
-
.withKafkaTopic(CamelKafkaConnectorTestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(getTopicForTest(this))
.withUrl(service.connectionProperties().hostname(),
service.connectionProperties().port(),
"X.test")
.append("username", service.connectionProperties().username())