This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3b315a3 core: simplify kafka headers to camel exchange mapping
3b315a3 is described below
commit 3b315a3e4c4e2270460376a422fb094c9f1fde40
Author: Luca Burgazzoli <[email protected]>
AuthorDate: Thu Oct 8 19:27:03 2020 +0200
core: simplify kafka headers to camel exchange mapping
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 86 ++++------------------
1 file changed, 13 insertions(+), 73 deletions(-)
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 63e139c..25d94f1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,9 +18,6 @@ package org.apache.camel.kafkaconnector;
import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -36,8 +33,6 @@ import org.apache.camel.support.DefaultExchange;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaBuilder;
-import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -130,18 +125,18 @@ public class CamelSinkTask extends SinkTask {
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
TaskHelper.logRecordContent(LOG, record, config);
- Map<String, Object> headers = new HashMap<>();
+
Exchange exchange = new
DefaultExchange(producer.getCamelContext());
- headers.put(KAFKA_RECORD_KEY_HEADER, record.key());
+ exchange.getMessage().setBody(record.value());
+ exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER,
record.key());
+
for (Header header : record.headers()) {
if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
- addHeader(headers, header);
+ mapHeader(header, HEADER_CAMEL_PREFIX,
exchange.getMessage().getHeaders());
} else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
- addProperty(exchange, header);
+ mapHeader(header, PROPERTY_CAMEL_PREFIX,
exchange.getProperties());
}
}
- exchange.getMessage().setHeaders(headers);
- exchange.getMessage().setBody(record.value());
LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(),
LOCAL_URL);
producer.send(localEndpoint, exchange);
@@ -172,69 +167,14 @@ public class CamelSinkTask extends SinkTask {
}
}
- private void addHeader(Map<String, Object> map, Header singleHeader) {
- String camelHeaderKey = StringUtils.removeStart(singleHeader.key(),
HEADER_CAMEL_PREFIX);
- Schema schema = singleHeader.schema();
-
- if (schema.type().equals(Timestamp.SCHEMA.type()) &&
Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
- map.put(camelHeaderKey, (Date)singleHeader.value());
- } else if
(schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
- map.put(camelHeaderKey, (String)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName()))
{
- map.put(camelHeaderKey, (Boolean)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName()))
{
- map.put(camelHeaderKey, singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName()))
{
- if (Decimal.class.getCanonicalName().equals(schema.name())) {
- map.put(camelHeaderKey, Decimal.toLogical(schema,
(byte[])singleHeader.value()));
- } else {
- map.put(camelHeaderKey, (byte[])singleHeader.value());
- }
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName()))
{
- map.put(camelHeaderKey, (float)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName()))
{
- map.put(camelHeaderKey, (double)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName()))
{
- map.put(camelHeaderKey, (short)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName()))
{
- map.put(camelHeaderKey, (long)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName()))
{
- map.put(camelHeaderKey, (byte)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA).type().getName())) {
- map.put(camelHeaderKey, (Map<?, ?>)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
{
- map.put(camelHeaderKey, (List<?>)singleHeader.value());
- }
- }
+ private static void mapHeader(Header header, String prefix, Map<String,
Object> destination) {
+ final String key = StringUtils.removeStart(header.key(), prefix);
+ final Schema schema = header.schema();
- private void addProperty(Exchange exchange, Header singleHeader) {
- String camelPropertyKey = StringUtils.removeStart(singleHeader.key(),
PROPERTY_CAMEL_PREFIX);
- Schema schema = singleHeader.schema();
-
- if (schema.type().equals(Timestamp.SCHEMA.type()) &&
Objects.equals(schema.name(), Timestamp.SCHEMA.name())) {
- exchange.getProperties().put(camelPropertyKey,
(Date)singleHeader.value());
- } else if
(schema.type().getName().equals(Schema.STRING_SCHEMA.type().getName())) {
- exchange.getProperties().put(camelPropertyKey,
(String)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.BOOLEAN_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(Boolean)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT32_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.BYTES_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(byte[])singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT32_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(float)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.FLOAT64_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(double)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT16_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(short)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT64_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(long)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(Schema.INT8_SCHEMA.type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(byte)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.map(Schema.STRING_SCHEMA,
Schema.STRING_SCHEMA).type().getName())) {
- exchange.getProperties().put(camelPropertyKey, (Map<?,
?>)singleHeader.value());
- } else if
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
{
- exchange.getProperties().put(camelPropertyKey,
(List<?>)singleHeader.value());
+ if (schema.type().equals(Schema.BYTES_SCHEMA.type()) &&
Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
+ destination.put(key, Decimal.toLogical(schema, (byte[])
header.value()));
+ } else {
+ destination.put(key, header.value());
}
}