This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 73913a9  CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from 
kafka.KEY (#5263)
73913a9 is described below

commit 73913a9a1d65394743c30ffcb263cfa8dee980a7
Author: jenskordowski <[email protected]>
AuthorDate: Sun Mar 28 15:25:49 2021 +0200

    CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY 
(#5263)
    
    * CAMEL-16060 camel-kafka - decouple kafka.PARTITION_KEY from kafka.KEY
    
    * CAMEL-16060 test fixes and documentation update
    
    Co-authored-by: Jens Kordowski <[email protected]>
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  2 +-
 .../camel/component/kafka/KafkaProducer.java       | 43 ++++++----------------
 .../component/kafka/KafkaProducerFullTest.java     | 16 ++++----
 .../camel/component/kafka/KafkaProducerTest.java   | 21 +++++++++++
 4 files changed, 41 insertions(+), 41 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 81ab68c..5dddf19 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -310,7 +310,7 @@ Before sending a message to Kafka you can configure the 
following headers.
 | Header constant              | Header value          | Type    | Description
 | KafkaConstants.KEY           | "kafka.KEY"           | Object  | *Required* 
The key of the message in order to ensure that all related message goes in the 
same partition
 | KafkaConstants.OVERRIDE_TOPIC | "kafka.OVERRIDE_TOPIC" | String  | The topic 
to which send the message (override and takes precedence), and the header is 
not preserved.
-| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly 
specify the partition (only used if the `KafkaConstants.KEY` header is defined)
+| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly 
specify the partition
 |===
 
 If you want to send a message to a dynamic topic then use 
`KafkaConstants.OVERRIDE_TOPIC` as its used as a one-time header
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index a8ef383..32aa440 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -189,8 +189,6 @@ public class KafkaProducer extends DefaultAsyncProducer {
                     String innerTopic = msgTopic;
                     Object innerKey = null;
                     Integer innerPartitionKey = null;
-                    boolean hasPartitionKey = false;
-                    boolean hasMessageKey = false;
 
                     Object value = next;
                     Exchange ex = null;
@@ -214,18 +212,15 @@ public class KafkaProducer extends DefaultAsyncProducer {
                             innerPartitionKey = 
endpoint.getConfiguration().getPartitionKey() != null
                                     ? 
endpoint.getConfiguration().getPartitionKey()
                                     : 
innerMmessage.getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
-                            hasPartitionKey = innerPartitionKey != null;
                         }
 
                         if (innerMmessage.getHeader(KafkaConstants.KEY) != 
null) {
                             innerKey = endpoint.getConfiguration().getKey() != 
null
                                     ? endpoint.getConfiguration().getKey() : 
innerMmessage.getHeader(KafkaConstants.KEY);
-
-                            final Object messageKey = innerKey != null
-                                    ? 
tryConvertToSerializedType(innerExchange, innerKey,
-                                            
endpoint.getConfiguration().getKeySerializer())
-                                    : null;
-                            hasMessageKey = messageKey != null;
+                            if (innerKey != null) {
+                                innerKey = 
tryConvertToSerializedType(innerExchange, innerKey,
+                                        
endpoint.getConfiguration().getKeySerializer());
+                            }
                         }
 
                         ex = innerExchange == null ? exchange : innerExchange;
@@ -234,17 +229,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
                     }
 
-                    if (hasPartitionKey && hasMessageKey) {
-                        return new KeyValueHolder(
-                                body,
-                                new ProducerRecord(innerTopic, 
innerPartitionKey, null, innerKey, value, propagatedHeaders));
-                    } else if (hasMessageKey) {
-                        return new KeyValueHolder(
-                                body, new ProducerRecord(innerTopic, null, 
null, innerKey, value, propagatedHeaders));
-                    } else {
-                        return new KeyValueHolder(
-                                body, new ProducerRecord(innerTopic, null, 
null, null, value, propagatedHeaders));
-                    }
+                    return new KeyValueHolder(
+                            body,
+                            new ProducerRecord(innerTopic, innerPartitionKey, 
null, innerKey, value, propagatedHeaders));
                 }
 
                 @Override
@@ -258,27 +245,19 @@ public class KafkaProducer extends DefaultAsyncProducer {
         final Integer partitionKey = 
endpoint.getConfiguration().getPartitionKey() != null
                 ? endpoint.getConfiguration().getPartitionKey()
                 : exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, 
Integer.class);
-        final boolean hasPartitionKey = partitionKey != null;
 
         // endpoint take precedence over header configuration
         Object key = endpoint.getConfiguration().getKey() != null
                 ? endpoint.getConfiguration().getKey() : 
exchange.getIn().getHeader(KafkaConstants.KEY);
-        final Object messageKey = key != null
-                ? tryConvertToSerializedType(exchange, key, 
endpoint.getConfiguration().getKeySerializer()) : null;
-        final boolean hasMessageKey = messageKey != null;
+        if (key != null) {
+            key = tryConvertToSerializedType(exchange, key, 
endpoint.getConfiguration().getKeySerializer());
+        }
 
         // must convert each entry of the iterator into the value according to
         // the serializer
         Object value = tryConvertToSerializedType(exchange, msg, 
endpoint.getConfiguration().getValueSerializer());
 
-        ProducerRecord record;
-        if (hasPartitionKey && hasMessageKey) {
-            record = new ProducerRecord(topic, partitionKey, null, key, value, 
propagatedHeaders);
-        } else if (hasMessageKey) {
-            record = new ProducerRecord(topic, null, null, key, value, 
propagatedHeaders);
-        } else {
-            record = new ProducerRecord(topic, null, null, null, value, 
propagatedHeaders);
-        }
+        ProducerRecord record = new ProducerRecord(topic, partitionKey, null, 
key, value, propagatedHeaders);
         return Collections.singletonList(new KeyValueHolder<Object, 
ProducerRecord>((Object) exchange, record)).iterator();
     }
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index cb7015b..f3b6559 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -73,7 +73,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     @EndpointInject("kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
     private Endpoint toStrings;
 
-    @EndpointInject("kafka:" + TOPIC_STRINGS + 
"?requestRequiredAcks=-1&partitionKey=1")
+    @EndpointInject("kafka:" + TOPIC_STRINGS + 
"?requestRequiredAcks=-1&partitionKey=0")
     private Endpoint toStrings2;
 
     @EndpointInject("kafka:" + TOPIC_INTERCEPTED + "?requestRequiredAcks=-1"
@@ -163,9 +163,9 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
 
-        sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test 
message", KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(messageInTopic, stringsTemplate, "IT test 
message", KafkaConstants.PARTITION_KEY, "0");
         sendMessagesInRoute(messageInOtherTopic, stringsTemplate, "IT test 
message in other topic",
-                KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
 
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
@@ -196,7 +196,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
         sendMessagesInRoute(messageInTopic, stringsTemplate2, "IT test 
message", (String[]) null);
         sendMessagesInRoute(messageInOtherTopic, stringsTemplate2, "IT test 
message in other topic",
-                KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
 
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
@@ -225,9 +225,9 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
 
         CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + 
messageInOtherTopic);
 
-        sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test 
message", KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(messageInTopic, interceptedTemplate, "IT test 
message", KafkaConstants.PARTITION_KEY, "0");
         sendMessagesInRoute(messageInOtherTopic, interceptedTemplate, "IT test 
message in other topic",
-                KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+                KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_INTERCEPTED, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
 
@@ -251,12 +251,12 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
             msgs.add("Message " + x);
         }
 
-        sendMessagesInRoute(1, stringsTemplate, msgs, 
KafkaConstants.PARTITION_KEY, "1");
+        sendMessagesInRoute(1, stringsTemplate, msgs, 
KafkaConstants.PARTITION_KEY, "0");
         msgs = new ArrayList<>();
         for (int x = 0; x < messageInOtherTopic; x++) {
             msgs.add("Other Message " + x);
         }
-        sendMessagesInRoute(1, stringsTemplate, msgs, 
KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC,
+        sendMessagesInRoute(1, stringsTemplate, msgs, 
KafkaConstants.PARTITION_KEY, "0", KafkaConstants.TOPIC,
                 TOPIC_STRINGS_IN_HEADER);
 
         createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, 
TOPIC_STRINGS_IN_HEADER, messagesLatch);
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index a12aec6..2b812e5 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -277,6 +277,19 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void processSendsMessageWithPartitionKeyHeaderOnly() throws 
Exception {
+        endpoint.getConfiguration().setTopic("someTopic");
+        Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(out);
+        in.setHeader(KafkaConstants.PARTITION_KEY, 4);
+
+        producer.process(exchange);
+
+        verifySendMessage(4, "someTopic");
+        assertRecordMetadataExists();
+    }
+
+    @Test
     public void processSendsMessageWithMessageKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
@@ -380,6 +393,14 @@ public class KafkaProducerTest {
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected void verifySendMessage(Integer partitionKey, String topic) {
+        ArgumentCaptor<ProducerRecord> captor = 
ArgumentCaptor.forClass(ProducerRecord.class);
+        Mockito.verify(producer.getKafkaProducer()).send(captor.capture());
+        assertEquals(partitionKey, captor.getValue().partition());
+        assertEquals(topic, captor.getValue().topic());
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     protected void verifySendMessage(String topic, String messageKey) {
         ArgumentCaptor<ProducerRecord> captor = 
ArgumentCaptor.forClass(ProducerRecord.class);
         Mockito.verify(producer.getKafkaProducer()).send(captor.capture());

Reply via email to