This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5527e2f Add contentLogLevel task property
new 569cc3e Merge pull request #193 from fvaleri/log-payload
5527e2f is described below
commit 5527e2fd9d46c1293cf531e233ca9cf43385704b
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Wed May 6 09:59:37 2020 +0200
Add contentLogLevel task property
---
core/pom.xml | 5 +
.../kafkaconnector/CamelSinkConnectorConfig.java | 8 +-
.../apache/camel/kafkaconnector/CamelSinkTask.java | 4 +-
.../kafkaconnector/CamelSourceConnectorConfig.java | 10 +-
.../camel/kafkaconnector/CamelSourceTask.java | 38 ++--
.../camel/kafkaconnector/utils/TaskHelper.java | 42 +++++
.../camel/kafkaconnector/CamelSinkTaskTest.java | 3 +
.../camel/kafkaconnector/CamelSourceTaskTest.java | 3 +
.../camel/kafkaconnector/utils/TaskHelperTest.java | 200 +++++++++++++++++----
parent/pom.xml | 7 +
10 files changed, 267 insertions(+), 53 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 91f8ec0..f082a9c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -89,6 +89,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-ext</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index c353c58..666661b 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
import java.util.Map;
+import org.apache.camel.LoggingLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -42,10 +43,15 @@ public class CamelSinkConnectorConfig extends
AbstractConfig {
public static final String TOPIC_CONF = "topics";
public static final String TOPIC_DOC = "A list of topics to use as input
for this connector";
+ public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT =
LoggingLevel.OFF.toString();
+ public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF =
"camel.sink.contentLogLevel";
+ public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level
for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT +
"). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
+
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT,
Importance.HIGH, CAMEL_SINK_URL_DOC)
.define(CAMEL_SINK_MARSHAL_CONF, Type.STRING,
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
- .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING,
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+ .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING,
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
+ .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING,
CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH,
CAMEL_SINK_CONTENT_LOG_LEVEL_DOC);
public CamelSinkConnectorConfig(ConfigDef config, Map<String, String>
parsedConfig) {
super(config, parsedConfig);
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index bd0870b..15ccd2b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -110,6 +110,7 @@ public class CamelSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
+ TaskHelper.logRecordContent(LOG, record, config);
Map<String, Object> headers = new HashMap<String, Object>();
Exchange exchange = new
DefaultExchange(producer.getCamelContext());
headers.put(KAFKA_RECORD_KEY_HEADER, record.key());
@@ -123,7 +124,8 @@ public class CamelSinkTask extends SinkTask {
}
exchange.getMessage().setHeaders(headers);
exchange.getMessage().setBody(record.value());
- LOG.debug("Sending {} to {}", exchange, LOCAL_URL);
+
+ LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(),
LOCAL_URL);
producer.send(LOCAL_URL, exchange);
}
}
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 34e2495..01d9fd3 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
import java.util.Map;
+import org.apache.camel.LoggingLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -60,13 +61,17 @@ public class CamelSourceConnectorConfig extends
AbstractConfig {
public static final Boolean
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT = true;
public static final String
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF =
"camel.source.pollingConsumerBlockWhenFull";
- public static final String
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC = " Whether to block any
producer if the internal queue is full.";
+ public static final String
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC = "Whether to block any
producer if the internal queue is full.";
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT = null;
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF =
"camel.source.camelMessageHeaderKey";
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name
of a camel message header containing an unique key that can be used as a Kafka
message key."
+ " If this is not specified, then the Kafka message will not have
a key.";
+ public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT =
LoggingLevel.OFF.toString();
+ public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF =
"camel.source.contentLogLevel";
+ public static final String CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC = "Log level
for the record's content (default: " + CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT +
"). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
+
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT,
Importance.HIGH, CAMEL_SOURCE_URL_DOC)
.define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING,
CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
@@ -77,7 +82,8 @@ public class CamelSourceConnectorConfig extends
AbstractConfig {
.define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG,
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
.define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF,
Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT,
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
.define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING,
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
- .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING,
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+ .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING,
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC)
+ .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING,
CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH,
CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC);
public CamelSourceConnectorConfig(ConfigDef config, Map<String, String>
parsedConfig) {
super(config, parsedConfig);
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index c9123ed..c8333ed 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -87,10 +87,8 @@ public class CamelSourceTask extends SourceTask {
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
remoteUrl =
TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
- actualProps,
-
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
-
CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
-
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+ actualProps,
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
+ CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null,
unmarshaller, camelContext);
@@ -113,35 +111,41 @@ public class CamelSourceTask extends SourceTask {
List<SourceRecord> records = new ArrayList<>();
- while (collectedRecords < maxBatchPollSize &&
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
+ while (collectedRecords < maxBatchPollSize
+ && (Instant.now().toEpochMilli() - startPollEpochMilli) <
maxPollDuration) {
Exchange exchange = consumer.receiveNoWait();
if (exchange != null) {
- LOG.debug("Received exchange with");
- LOG.debug("\t from endpoint: {}", exchange.getFromEndpoint());
- LOG.debug("\t exchange id: {}", exchange.getExchangeId());
- LOG.debug("\t message id: {}",
exchange.getMessage().getMessageId());
- LOG.debug("\t message body: {}",
exchange.getMessage().getBody());
- LOG.debug("\t message headers: {}",
exchange.getMessage().getHeaders());
- LOG.debug("\t message properties: {}",
exchange.getProperties());
+ LOG.debug("Received Exchange {} with Message {} from Endpoint
{}", exchange.getExchangeId(),
+ exchange.getMessage().getMessageId(),
exchange.getFromEndpoint());
// TODO: see if there is a better way to use sourcePartition
an sourceOffset
- Map<String, String> sourcePartition =
Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
+ Map<String, String> sourcePartition =
Collections.singletonMap("filename",
+ exchange.getFromEndpoint().toString());
Map<String, String> sourceOffset =
Collections.singletonMap("position", exchange.getExchangeId());
- final Object messageHeaderKey = camelMessageHeaderKey != null
? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
+ final Object messageHeaderKey = camelMessageHeaderKey != null
+ ?
exchange.getMessage().getHeader(camelMessageHeaderKey)
+ : null;
final Object messageBodyValue =
exchange.getMessage().getBody();
- final Schema messageKeySchema = messageHeaderKey != null ?
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
- final Schema messageBodySchema = messageBodyValue != null ?
SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+ final Schema messageKeySchema = messageHeaderKey != null
+ ?
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey)
+ : null;
+ final Schema messageBodySchema = messageBodyValue != null
+ ?
SchemaHelper.buildSchemaBuilderForType(messageBodyValue)
+ : null;
- SourceRecord record = new SourceRecord(sourcePartition,
sourceOffset, topic, messageKeySchema, messageHeaderKey, messageBodySchema,
messageBodyValue);
+ SourceRecord record = new SourceRecord(sourcePartition,
sourceOffset, topic, messageKeySchema,
+ messageHeaderKey, messageBodySchema, messageBodyValue);
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(record,
exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
if (exchange.hasProperties()) {
setAdditionalHeaders(record, exchange.getProperties(),
PROPERTY_CAMEL_PREFIX);
}
+
+ TaskHelper.logRecordContent(LOG, record, config);
records.add(record);
collectedRecords++;
} else {
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index c623662..672f13b 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -22,7 +22,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.catalog.RuntimeCamelCatalog;
+import org.apache.camel.kafkaconnector.CamelSinkConnectorConfig;
+import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
public final class TaskHelper {
@@ -93,4 +100,39 @@ public final class TaskHelper {
}
return false;
}
+
+ public static <CFG extends AbstractConfig> void logRecordContent(Logger
logger, ConnectRecord<?> record, CFG config) {
+ if (logger != null && record != null && config != null) {
+ // do not log record's content by default, as it may contain
sensitive information
+ LoggingLevel level = LoggingLevel.OFF;
+ try {
+ final String key = (record instanceof SourceRecord)
+ ?
CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF
+ :
CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF;
+ level =
LoggingLevel.valueOf(config.getString(key).toUpperCase());
+ } catch (Exception e) {
+ logger.warn("Invalid value for contentLogLevel property");
+ }
+ switch (level) {
+ case TRACE:
+ logger.trace(record.toString());
+ break;
+ case DEBUG:
+ logger.debug(record.toString());
+ break;
+ case INFO:
+ logger.info(record.toString());
+ break;
+ case WARN:
+ logger.warn(record.toString());
+ break;
+ case ERROR:
+ logger.error(record.toString());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
}
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 24cd03c..4941741 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -56,6 +57,8 @@ public class CamelSinkTaskTest {
Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(),
sinkTask.getCamelSinkConnectorConfig(props)
+
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
sinkTask.stop();
}
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 33807ee..3032cba 100644
---
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
@@ -56,6 +57,8 @@ public class CamelSourceTaskTest {
assertEquals(size, poll.size());
assertEquals(TOPIC_NAME, poll.get(0).topic());
+ assertEquals(LoggingLevel.OFF.toString(),
sourceTask.getCamelSourceConnectorConfig(props)
+
.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF));
sourceTask.stop();
}
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 3cfe5ed..bc0f537 100644
---
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -20,16 +20,24 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
+import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.ext.LoggerWrapper;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-
public class TaskHelperTest {
@Test
@@ -41,13 +49,17 @@ public class TaskHelperTest {
@Test
public void testMergePropertiesDefaultAreAdded() {
- Map<String, String> defaults = new HashMap<String, String>() {{
+ Map<String, String> defaults = new HashMap<String, String>() {
+ {
put("property", "defaultValue");
- }};
+ }
+ };
- Map<String, String> loaded = new HashMap<String, String>() {{
+ Map<String, String> loaded = new HashMap<String, String>() {
+ {
put("anotherProperty", "loadedValue");
- }};
+ }
+ };
Map result = TaskHelper.mergeProperties(defaults, loaded);
@@ -59,13 +71,17 @@ public class TaskHelperTest {
@Test
public void testMergePropertiesLoadedHavePrecedence() {
- Map<String, String> defaults = new HashMap<String, String>() {{
+ Map<String, String> defaults = new HashMap<String, String>() {
+ {
put("property", "defaultValue");
- }};
+ }
+ };
- Map<String, String> loaded = new HashMap<String, String>() {{
+ Map<String, String> loaded = new HashMap<String, String>() {
+ {
put("property", "loadedValue");
- }};
+ }
+ };
Map result = TaskHelper.mergeProperties(defaults, loaded);
@@ -75,17 +91,21 @@ public class TaskHelperTest {
@Test
public void testMergePropertiesLoadedHavePrecedenceWithPrefixFiltering() {
- Map<String, String> defaults = new HashMap<String, String>() {{
+ Map<String, String> defaults = new HashMap<String, String>() {
+ {
put("property", "defaultValue");
put("camel.component.x.objectProperty",
"#class:my.package.MyClass");
put("camel.component.x.objectProperty.field", "defaultValue");
- }};
+ }
+ };
- Map<String, String> loaded = new HashMap<String, String>() {{
+ Map<String, String> loaded = new HashMap<String, String>() {
+ {
put("camel.component.x.objectProperty",
"#class:my.package.MyOtherClass");
put("camel.component.x.objectProperty.anotherField",
"loadedValue");
put("camel.component.x.normalProperty", "loadedValue");
- }};
+ }
+ };
Map result = TaskHelper.mergeProperties(defaults, loaded);
@@ -98,10 +118,12 @@ public class TaskHelperTest {
@Test
public void testCreateEndpointOptionsFromProperties() {
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("prefix.key1", "value1");
put("notprefix.key2", "value2");
- }};
+ }
+ };
String result = TaskHelper.createEndpointOptionsFromProperties(props,
"prefix.");
@@ -110,10 +132,12 @@ public class TaskHelperTest {
@Test
public void testCreateEndpointOptionsFromPropertiesConcatenation() {
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("prefix.key1", "value1");
put("prefix.key2", "value2");
- }};
+ }
+ };
String result = TaskHelper.createEndpointOptionsFromProperties(props,
"prefix.");
@@ -122,10 +146,12 @@ public class TaskHelperTest {
@Test
public void testCreateEndpointOptionsFromPropertiesEmpty() {
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("prefix.key1", "value1");
put("notprefix.key2", "value2");
- }};
+ }
+ };
String result = TaskHelper.createEndpointOptionsFromProperties(props,
"anotherprefix");
@@ -134,10 +160,12 @@ public class TaskHelperTest {
@Test
public void testCreateUrlPathFromProperties() {
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("prefix.key1", "value1");
put("notprefix.key2", "value2");
- }};
+ }
+ };
String result = TaskHelper.createUrlPathFromProperties(props,
"prefix.");
@@ -146,10 +174,12 @@ public class TaskHelperTest {
@Test
public void testCreateUrlPathFromPropertiesConcatenation() {
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("prefix.key1", "value1");
put("prefix.key2", "value2");
- }};
+ }
+ };
String result = TaskHelper.createUrlPathFromProperties(props,
"prefix.");
@@ -158,10 +188,12 @@ public class TaskHelperTest {
@Test
public void testCreateUrlPathFromPropertiesEmpty() {
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("prefix.key1", "value1");
put("notprefix.key2", "value2");
- }};
+ }
+ };
String result = TaskHelper.createUrlPathFromProperties(props,
"anotherprefix");
@@ -170,10 +202,12 @@ public class TaskHelperTest {
@Test
public void testBuildUrl() {
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("prefix.key1", "value1");
put("anotherPrefix.key2", "value2");
- }};
+ }
+ };
String result = TaskHelper.buildUrl(props, "test", "prefix.",
"anotherPrefix.");
@@ -184,23 +218,125 @@ public class TaskHelperTest {
public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
DefaultCamelContext dcc = new DefaultCamelContext();
RuntimeCamelCatalog rcc =
dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
- Map<String, String> props = new HashMap<String, String>() {{
+ Map<String, String> props = new HashMap<String, String>() {
+ {
put("camel.source.path.name", "test");
put("camel.source.endpoint.synchronous", "true");
- }};
+ }
+ };
String result = TaskHelper.buildUrl(rcc, props, "direct",
"camel.source.endpoint.", "camel.source.path.");
assertEquals("direct:test?synchronous=true", result);
- props = new HashMap<String, String>() {{
+ props = new HashMap<String, String>() {
+ {
put("camel.source.path.port", "8080");
put("camel.source.path.keyspace", "test");
put("camel.source.path.hosts", "localhost");
- }};
+ }
+ };
result = TaskHelper.buildUrl(rcc, props, "cql",
"camel.source.endpoint.", "camel.source.path.");
assertEquals("cql:localhost:8080/test", result);
}
-}
\ No newline at end of file
+
+ private CamelSourceConnectorConfig getSourceConnectorConfig(String
logLevel) {
+ return new
CamelSourceConnectorConfig(CamelSourceConnectorConfig.conf(),
+
Collections.singletonMap(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF,
logLevel));
+ }
+
+ @Test
+ public void testlogRecordContent() {
+ String partName = "abc123";
+ Logger logger = new
MyLogger(LoggerFactory.getLogger(TaskHelperTest.class), null);
+ SourceRecord record = new
SourceRecord(Collections.singletonMap("partition", partName),
+ Collections.singletonMap("offset", "0"), null, null, null, null);
+ Queue<String> logEvents = ((MyLogger)logger).getEvents();
+
+ String offLevel = LoggingLevel.OFF.toString();
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(offLevel));
+ assertNull(logEvents.poll());
+
+ String traceLevel = LoggingLevel.TRACE.toString();
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(traceLevel));
+ assertTrue(logEvents.peek().contains(traceLevel) &&
logEvents.poll().contains(partName));
+
+ String debugLevel = LoggingLevel.DEBUG.toString();
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(debugLevel));
+ assertTrue(logEvents.peek().contains(debugLevel) &&
logEvents.poll().contains(partName));
+
+ String infoLevel = LoggingLevel.INFO.toString();
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(infoLevel));
+ assertTrue(logEvents.peek().contains(infoLevel) &&
logEvents.poll().contains(partName));
+
+ String warnLevel = LoggingLevel.WARN.toString();
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(warnLevel));
+ assertTrue(logEvents.peek().contains(warnLevel) &&
logEvents.poll().contains(partName));
+
+ String errorLevel = LoggingLevel.ERROR.toString();
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(errorLevel));
+ assertTrue(logEvents.peek().contains(errorLevel) &&
logEvents.poll().contains(partName));
+
+ TaskHelper.logRecordContent(null, record,
getSourceConnectorConfig(debugLevel));
+ assertNull(logEvents.poll());
+
+ TaskHelper.logRecordContent(logger, null,
getSourceConnectorConfig(debugLevel));
+ assertNull(logEvents.poll());
+
+ TaskHelper.logRecordContent(logger, record, null);
+ assertNull(logEvents.poll());
+
+ String invalidLevel = "NOLOG";
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(invalidLevel));
+ assertTrue(logEvents.poll().contains(warnLevel));
+
+ TaskHelper.logRecordContent(logger, record,
getSourceConnectorConfig(null));
+ assertTrue(logEvents.poll().contains(warnLevel));
+ }
+
+ class MyLogger extends LoggerWrapper {
+ private Queue<String> events = new ConcurrentLinkedQueue<String>();
+
+ public MyLogger(Logger logger, String fqcn) {
+ super(logger, fqcn);
+ }
+
+ public Queue<String> getEvents() {
+ return events;
+ }
+
+ private void log(LoggingLevel level, String msg) {
+ StringBuilder sb = new StringBuilder()
+ .append(level).append(" ").append(msg);
+ events.add(sb.toString());
+ }
+
+ @Override
+ public void trace(String msg) {
+ log(LoggingLevel.TRACE, msg);
+ }
+
+ @Override
+ public void debug(String msg) {
+ log(LoggingLevel.DEBUG, msg);
+ }
+
+ @Override
+ public void info(String msg) {
+ log(LoggingLevel.INFO, msg);
+ }
+
+ @Override
+ public void warn(String msg) {
+ log(LoggingLevel.WARN, msg);
+ }
+
+ @Override
+ public void error(String msg) {
+ log(LoggingLevel.ERROR, msg);
+ }
+ }
+
+}
diff --git a/parent/pom.xml b/parent/pom.xml
index 3c76418..9212b67 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -32,6 +32,7 @@
<junit.version>5.6.2</junit.version>
<camel.version>3.3.0</camel.version>
<jackson.version>2.10.3</jackson.version>
+ <slf4j.version>1.7.30</slf4j.version>
<log4j2.version>2.8.2</log4j2.version>
<version.commons-io>2.6</version.commons-io>
<version.java>1.8</version.java>
@@ -192,6 +193,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-ext</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j2.version}</version>