This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6bf494f2f54a152e8eadd213eefa78b9f0e8e0c7 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Mon Aug 26 11:37:44 2024 +0200 (chores) camel-kafka simplify type checks - to use pattern matching for instanceof - use simpler assertion checks --- .../camel/component/kafka/KafkaConsumer.java | 4 ++-- .../support/resume/KafkaResumeAdapter.java | 10 +++------ .../producer/support/KeyValueHolderIterator.java | 14 ++++++------- .../kafka/producer/support/ProducerUtil.java | 16 +++++++-------- .../kafka/serde/DefaultKafkaHeaderSerializer.java | 24 +++++++++++----------- .../kafka/SingleNodeKafkaResumeStrategy.java | 12 +++++------ .../camel/component/kafka/KafkaProducerTest.java | 6 +++--- 7 files changed, 39 insertions(+), 47 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 9ea0e1502ca..1d496d34a29 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -138,8 +138,8 @@ public class KafkaConsumer extends DefaultConsumer // is the offset repository already started? StateRepository<String, String> repo = endpoint.getConfiguration().getOffsetRepository(); - if (repo instanceof ServiceSupport) { - boolean started = ((ServiceSupport) repo).isStarted(); + if (repo instanceof ServiceSupport serviceSupport) { + boolean started = serviceSupport.isStarted(); // if not already started then we would do that and also stop it if (!started) { stopOffsetRepo = true; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java index 641e5b5af06..40250d72604 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java @@ -54,8 +54,7 @@ public class KafkaResumeAdapter implements ResumeAdapter, Deserializable, Cachea Object keyObj = deserializeKey(keyBuffer); Object valueObj = deserializeValue(valueBuffer); - if (keyObj instanceof String) { - String key = (String) keyObj; + if (keyObj instanceof String key) { final String[] keyParts = key.split("/"); if (keyParts == null || keyParts.length != 2) { @@ -63,8 +62,7 @@ public class KafkaResumeAdapter implements ResumeAdapter, Deserializable, Cachea String topic = keyParts[0]; int partition = Integer.parseInt(keyParts[1]); - if (valueObj instanceof Long) { - Long offset = (Long) valueObj; + if (valueObj instanceof Long offset) { resumeCache.add(new TopicPartition(topic, partition), offset); } else { @@ -87,9 +85,7 @@ public class KafkaResumeAdapter implements ResumeAdapter, Deserializable, Cachea Object keyObj = key.getValue(); Long valueObject = offset.getValue(Long.class); - if (keyObj instanceof TopicPartition) { - TopicPartition topicPartition = (TopicPartition) keyObj; - + if (keyObj instanceof TopicPartition topicPartition) { resumeCache.add(topicPartition, valueObject); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java index ac82d09711e..6231cf39e79 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/KeyValueHolderIterator.java @@ -81,17 +81,17 @@ public class KeyValueHolderIterator implements Iterator<KeyValueHolder<Object, P msgTopic, null, null, null, body, propagatedHeadersProvider.getDefaultHeaders())); } - private Message getInnerMessage(Object body) { - if (body instanceof Exchange) { - return ((Exchange) body).getIn(); + private Message getInnerMessage(Object object) { + if (object instanceof Exchange exchange) { + return exchange.getIn(); } - return (Message) body; + return (Message) object; } - private Exchange getInnerExchange(Object body) { - if (body instanceof Exchange) { - return (Exchange) body; + private Exchange getInnerExchange(Object object) { + if (object instanceof Exchange exchange) { + return exchange; } return null; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java index 992726eec3a..7563d08f2f4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/ProducerUtil.java @@ -55,13 +55,13 @@ public final class ProducerUtil { return answer != null ? answer : object; } - static void setException(Object body, Exception e) { + static void setException(Object object, Exception e) { if (e != null) { - if (body instanceof Exchange) { - ((Exchange) body).setException(e); + if (object instanceof Exchange exchange) { + exchange.setException(e); } - if (body instanceof Message && ((Message) body).getExchange() != null) { - ((Message) body).getExchange().setException(e); + if (object instanceof Message message && message.getExchange() != null) { + message.getExchange().setException(e); } } } @@ -73,12 +73,10 @@ public final class ProducerUtil { } public static void setRecordMetadata(Object body, List<RecordMetadata> recordMetadataList) { - if (body instanceof Exchange) { - Exchange ex = (Exchange) body; + if (body instanceof Exchange ex) { ex.getMessage().setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList); } - if (body instanceof Message) { - Message msg = (Message) body; + if (body instanceof Message msg) { msg.setHeader(KafkaConstants.KAFKA_RECORD_META, recordMetadataList); } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java index 79846df5843..308e801bf09 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java @@ -30,24 +30,24 @@ public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer, Came @Override public byte[] serialize(final String key, final Object value) { - if (value instanceof String) { - return ((String) value).getBytes(); - } else if (value instanceof Long) { + if (value instanceof String string) { + return string.getBytes(); + } else if (value instanceof Long aLong) { ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); - buffer.putLong((Long) value); + buffer.putLong(aLong); return buffer.array(); - } else if (value instanceof Integer) { + } else if (value instanceof Integer integer) { ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); - buffer.putInt((Integer) value); + buffer.putInt(integer); return buffer.array(); - } else if (value instanceof Double) { + } else if (value instanceof Double aDouble) { ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES); - buffer.putDouble((Double) value); + buffer.putDouble(aDouble); return buffer.array(); - } else if (value instanceof Boolean) { - return value.toString().getBytes(); - } else if (value instanceof byte[]) { - return (byte[]) value; + } else if (value instanceof Boolean b) { + return b.toString().getBytes(); + } else if (value instanceof byte[] bytes) { + return bytes; } if (camelContext != null) { byte[] converted = camelContext.getTypeConverter().tryConvertTo(byte[].class, value); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 1349065e75f..2290a8c1b82 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -123,9 +123,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, Camel } protected void doAdd(OffsetKey<?> key, Offset<?> offsetValue) { - if (adapter instanceof Cacheable) { - Cacheable cacheable = (Cacheable) adapter; - + if (adapter instanceof Cacheable cacheable) { cacheable.add(key, offsetValue); } } @@ -344,8 +342,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, Camel } private void subscribe(Consumer<byte[], byte[]> consumer) { - if (adapter instanceof Cacheable) { - ResumeCache<?> cache = ((Cacheable) adapter).getCache(); + if (adapter instanceof Cacheable cacheable) { + ResumeCache<?> cache = cacheable.getCache(); if (cache.capacity() >= 1) { checkAndSubscribe(consumer, resumeStrategyConfiguration.getTopic(), cache.capacity()); @@ -454,8 +452,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, Camel @Override public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) { - if (resumeStrategyConfiguration instanceof KafkaResumeStrategyConfiguration) { - this.resumeStrategyConfiguration = (KafkaResumeStrategyConfiguration) resumeStrategyConfiguration; + if (resumeStrategyConfiguration instanceof KafkaResumeStrategyConfiguration kafkaResumeStrategyConfiguration) { + this.resumeStrategyConfiguration = kafkaResumeStrategyConfiguration; } else { throw new RuntimeCamelException( "Invalid resume strategy configuration of type " + diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 5d6dafe3ca9..daed89e8685 100755 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -51,10 +51,10 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -82,13 +82,13 @@ public class KafkaProducerTest { endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new HashMap()); endpoint.doBuild(); - assertTrue(endpoint.getKafkaClientFactory() instanceof DefaultKafkaClientFactory); + assertInstanceOf(DefaultKafkaClientFactory.class, endpoint.getKafkaClientFactory()); producer = new KafkaProducer(endpoint); fromEndpoint = kafka.createEndpoint("kafka:fromtopic", "fromtopic", new HashMap()); fromEndpoint.doBuild(); - assertTrue(fromEndpoint.getKafkaClientFactory() instanceof DefaultKafkaClientFactory); + assertInstanceOf(DefaultKafkaClientFactory.class, fromEndpoint.getKafkaClientFactory()); RecordMetadata rm = new RecordMetadata(null, 0, 0, 0, 0, 0); Future future = Mockito.mock(Future.class);
