This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new c0def69  (chores) camel-kafka: code cleanups (#6074)
c0def69 is described below

commit c0def695648fe93623d393950f8c06f8c526630f
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Sep 10 17:11:01 2021 +0200

    (chores) camel-kafka: code cleanups (#6074)
    
    - replaced reference to deprecated configuration constant
    - cleanup generating Kafka properties from endpoint configuration
    - replaced reference to deprecated configuration constant
    - removed usage of deprecated hasOut/getOut methods
    - do not assert a primitive for null
---
 .../camel/component/kafka/KafkaConfiguration.java  |  7 +++--
 .../camel/component/kafka/KafkaConsumer.java       | 35 ++++++++++------------
 .../camel/component/kafka/KafkaFetchRecords.java   |  7 +++--
 .../camel/component/kafka/KafkaProducer.java       | 18 ++---------
 .../camel/component/kafka/KafkaProducerTest.java   | 33 ++++++++++----------
 5 files changed, 47 insertions(+), 53 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 74430fc..c70f7cb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -50,6 +50,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 
 @UriParams
 public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware {
@@ -404,7 +405,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
getKerberosBeforeReloginMinTime());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
getKerberosRenewWindowFactor());
-        addListPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
getKerberosPrincipalToLocalRules());
+        addListPropertyIfNotNull(props, 
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
+                getKerberosPrincipalToLocalRules());
         addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, 
getSaslMechanism());
         addPropertyIfNotNull(props, SaslConfigs.SASL_JAAS_CONFIG, 
getSaslJaasConfig());
 
@@ -471,7 +473,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 
getKerberosBeforeReloginMinTime());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
         addPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 
getKerberosRenewWindowFactor());
-        addListPropertyIfNotNull(props, 
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, 
getKerberosPrincipalToLocalRules());
+        addListPropertyIfNotNull(props, 
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
+                getKerberosPrincipalToLocalRules());
         addPropertyIfNotNull(props, SaslConfigs.SASL_MECHANISM, 
getSaslMechanism());
         addPropertyIfNotNull(props, SaslConfigs.SASL_JAAS_CONFIG, 
getSaslJaasConfig());
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ddfebe2..f80ac69 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -29,6 +29,7 @@ import 
org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,29 +65,25 @@ public class KafkaConsumer extends DefaultConsumer {
         return (KafkaEndpoint) super.getEndpoint();
     }
 
+    private String randomUUID() {
+        return UUID.randomUUID().toString();
+    }
+
     Properties getProps() {
-        Properties props = 
endpoint.getConfiguration().createConsumerProperties();
+        KafkaConfiguration configuration = endpoint.getConfiguration();
+
+        Properties props = configuration.createConsumerProperties();
         endpoint.updateClassProperties(props);
 
-        String brokers = 
endpoint.getKafkaClientFactory().getBrokers(endpoint.getConfiguration());
-        if (brokers != null) {
-            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
-        }
+        
ObjectHelper.ifNotEmpty(endpoint.getKafkaClientFactory().getBrokers(configuration),
+                v -> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, v));
+
+        String groupId = 
ObjectHelper.supplyIfEmpty(configuration.getGroupId(), this::randomUUID);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        ObjectHelper.ifNotEmpty(configuration.getGroupInstanceId(),
+                v -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, v));
 
-        if (endpoint.getConfiguration().getGroupId() != null) {
-            String groupId = endpoint.getConfiguration().getGroupId();
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-            LOG.debug("Kafka consumer groupId is {}", groupId);
-        } else {
-            String randomGroupId = UUID.randomUUID().toString();
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, randomGroupId);
-            LOG.debug("Kafka consumer groupId is {} (generated)", 
randomGroupId);
-        }
-        if (endpoint.getConfiguration().getGroupInstanceId() != null) {
-            String gid = endpoint.getConfiguration().getGroupInstanceId();
-            LOG.debug("Kafka consumer groupInstanceId is {}", gid);
-            props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, gid);
-        }
         return props;
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index dedd0b8..ee6921c 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -108,8 +108,7 @@ class KafkaFetchRecords implements Runnable, 
ConsumerRebalanceListener {
 
             first = false;
 
-            if (!kafkaConsumer.isRunAllowed() || 
kafkaConsumer.isStoppingOrStopped()
-                    || kafkaConsumer.isSuspendingOrSuspended()) {
+            if (isCloseable()) {
                 LOG.debug("Closing consumer {}", threadId);
                 IOHelper.close(consumer);
                 return;
@@ -122,6 +121,10 @@ class KafkaFetchRecords implements Runnable, 
ConsumerRebalanceListener {
         LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: 
{}", threadId, topicName);
     }
 
+    private boolean isCloseable() {
+        return !kafkaConsumer.isRunAllowed() || 
kafkaConsumer.isStoppingOrStopped() || kafkaConsumer.isSuspendingOrSuspended();
+    }
+
     void preInit() {
         doInit();
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 43d6541..8d908c6 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -307,11 +307,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         List<RecordMetadata> recordMetadatas = new ArrayList<>();
 
         if (endpoint.getConfiguration().isRecordMetadata()) {
-            if (exchange.hasOut()) {
-                exchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
-            } else {
-                exchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
-            }
+            exchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
         }
 
         while (c.hasNext()) {
@@ -331,11 +327,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 innerExchange = (Exchange) f.getKey();
                 if (innerExchange != null) {
                     if (endpoint.getConfiguration().isRecordMetadata()) {
-                        if (innerExchange.hasOut()) {
-                            
innerExchange.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
-                        } else {
-                            
innerExchange.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
-                        }
+                        
innerExchange.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, metadata);
                     }
                 }
             }
@@ -432,11 +424,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
             if (endpoint.getConfiguration().isRecordMetadata()) {
                 if (body instanceof Exchange) {
                     Exchange ex = (Exchange) body;
-                    if (ex.hasOut()) {
-                        ex.getOut().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
-                    } else {
-                        ex.getIn().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
-                    }
+                    ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, 
recordMetadatas);
                 }
                 if (body instanceof Message) {
                     Message msg = (Message) body;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 59fcb84..aac2425 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -69,7 +69,6 @@ public class KafkaProducerTest {
     private Exchange exchange = Mockito.mock(Exchange.class);
     private ExtendedCamelContext camelContext = 
Mockito.mock(ExtendedCamelContext.class);
     private Message in = new DefaultMessage(camelContext);
-    private Message out = new DefaultMessage(camelContext);
     private AsyncCallback callback = Mockito.mock(AsyncCallback.class);
 
     @SuppressWarnings({ "unchecked" })
@@ -118,7 +117,7 @@ public class KafkaProducerTest {
     public void processSendsMessage() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
@@ -145,7 +144,7 @@ public class KafkaProducerTest {
     public void processAsyncSendsMessage() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
@@ -162,7 +161,7 @@ public class KafkaProducerTest {
     public void processAsyncSendsMessageWithException() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         // setup the exception here
         org.apache.kafka.clients.producer.KafkaProducer kp = 
producer.getKafkaProducer();
@@ -186,7 +185,7 @@ public class KafkaProducerTest {
         endpoint.getConfiguration().setTopic(null);
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         producer.process(exchange);
 
@@ -198,7 +197,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithTopicHeaderAndEndPoint() throws 
Exception {
         endpoint.getConfiguration().setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
@@ -217,7 +216,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithOverrideTopicHeaderAndEndPoint() throws 
Exception {
         endpoint.getConfiguration().setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
@@ -238,6 +237,7 @@ public class KafkaProducerTest {
     public void processRequiresTopicInEndpointOrInHeader() throws Exception {
         endpoint.getConfiguration().setTopic(null);
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.KEY, "someKey");
 
@@ -251,6 +251,7 @@ public class KafkaProducerTest {
     public void processRequiresTopicInConfiguration() throws Exception {
         endpoint.getConfiguration().setTopic("configTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.KEY, "someKey");
 
@@ -264,7 +265,7 @@ public class KafkaProducerTest {
     public void processDoesNotRequirePartitionHeader() throws Exception {
         endpoint.getConfiguration().setTopic("sometopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         producer.process(exchange);
 
@@ -275,7 +276,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithPartitionKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
         in.setHeader(KafkaConstants.KEY, "someKey");
 
@@ -289,7 +290,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithPartitionKeyHeaderOnly() throws 
Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
 
         producer.process(exchange);
@@ -302,7 +303,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithMessageKeyHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
         in.setHeader(KafkaConstants.KEY, "someKey");
 
         producer.process(exchange);
@@ -315,7 +316,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithMessageTimestampHeader() throws 
Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
         in.setHeader(KafkaConstants.KEY, "someKey");
         in.setHeader(KafkaConstants.OVERRIDE_TIMESTAMP,
                 
LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
@@ -330,7 +331,7 @@ public class KafkaProducerTest {
     public void processSendMessageWithTopicHeader() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
         in.setHeader(KafkaConstants.PARTITION_KEY, 4);
@@ -345,7 +346,7 @@ public class KafkaProducerTest {
     public void processSendsMessageWithMessageTopicName() throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
-        Mockito.when(exchange.getMessage()).thenReturn(out);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         producer.process(exchange);
 
@@ -357,6 +358,7 @@ public class KafkaProducerTest {
     public void 
processSendsMessageWithListOfExchangesWithOverrideTopicHeaderOnEveryExchange() 
throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         // we set the initial topic
         in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
@@ -384,6 +386,7 @@ public class KafkaProducerTest {
     public void 
processSendsMessageWithListOfMessagesWithOverrideTopicHeaderOnEveryExchange() 
throws Exception {
         endpoint.getConfiguration().setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
+        Mockito.when(exchange.getMessage()).thenReturn(in);
 
         // we set the initial topic
         in.setHeader(KafkaConstants.OVERRIDE_TOPIC, "anotherTopic");
@@ -453,7 +456,7 @@ public class KafkaProducerTest {
         List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>) 
in.getHeader(KafkaConstants.KAFKA_RECORDMETA);
         assertNotNull(recordMetaData1);
         assertEquals(1, recordMetaData1.size(), "Expected one recordMetaData");
-        assertNotNull(recordMetaData1.get(0).timestamp());
+        assertNotNull(recordMetaData1.get(0));
     }
 
     private void assertRecordMetadataExists() {

Reply via email to