This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new b7ff890 Camel-Kafka: Fixed CS
b7ff890 is described below
commit b7ff890a4fb07b1c1ffd9fd299caa16048c9f88f
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Mar 25 08:16:57 2019 +0100
Camel-Kafka: Fixed CS
---
.../camel/component/kafka/KafkaConsumer.java | 97 +++++++++++++---------
.../KafkaConsumerRebalancePartitionRevokeTest.java | 8 +-
.../kafka/KafkaConsumerRebalanceTest.java | 6 +-
3 files changed, 66 insertions(+), 45 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 9417dc7..e644a10 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -106,14 +106,13 @@ public class KafkaConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
- log.info("Starting Kafka consumer on topic: {} with breakOnFirstError:
{}",
- endpoint.getConfiguration().getTopic(),
endpoint.getConfiguration().isBreakOnFirstError());
+ log.info("Starting Kafka consumer on topic: {} with breakOnFirstError:
{}", endpoint.getConfiguration().getTopic(),
endpoint.getConfiguration().isBreakOnFirstError());
super.doStart();
// is the offset repository already started?
StateRepository repo =
endpoint.getConfiguration().getOffsetRepository();
if (repo instanceof ServiceSupport) {
- boolean started = ((ServiceSupport) repo).isStarted();
+ boolean started = ((ServiceSupport)repo).isStarted();
// if not already started then we would do that and also stop it
if (!started) {
stopOffsetRepo = true;
@@ -132,7 +131,8 @@ public class KafkaConsumer extends DefaultConsumer {
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount();
i++) {
KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i +
"", getProps());
- // pre-initialize task during startup so if there is any error we
have it thrown asap
+ // pre-initialize task during startup so if there is any error we
+ // have it thrown asap
task.preInit();
executor.submit(task);
tasks.add(task);
@@ -190,7 +190,8 @@ public class KafkaConsumer extends DefaultConsumer {
while (reConnect) {
try {
if (!first) {
- // re-initialize on re-connect so we have a fresh
consumer
+ // re-initialize on re-connect so we have a fresh
+ // consumer
doInit();
}
} catch (Throwable e) {
@@ -211,7 +212,8 @@ public class KafkaConsumer extends DefaultConsumer {
first = false;
- // doRun keeps running until we either shutdown or is told to
re-connect
+ // doRun keeps running until we either shutdown or is told to
+ // re-connect
reConnect = doRun();
}
}
@@ -224,9 +226,11 @@ public class KafkaConsumer extends DefaultConsumer {
// create consumer
ClassLoader threadClassLoader =
Thread.currentThread().getContextClassLoader();
try {
- // Kafka uses reflection for loading authentication settings,
use its classloader
+ // Kafka uses reflection for loading authentication settings,
+ // use its classloader
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
- // this may throw an exception if something is wrong with
kafka consumer
+ // this may throw an exception if something is wrong with kafka
+ // consumer
this.consumer = new
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
@@ -235,7 +239,8 @@ public class KafkaConsumer extends DefaultConsumer {
@SuppressWarnings("unchecked")
protected boolean doRun() {
- // allow to re-connect thread in case we use that to retry failed
messages
+ // allow to re-connect thread in case we use that to retry failed
+ // messages
boolean reConnect = false;
boolean unsubscribing = false;
@@ -250,19 +255,23 @@ public class KafkaConsumer extends DefaultConsumer {
StateRepository<String, String> offsetRepository =
endpoint.getConfiguration().getOffsetRepository();
if (offsetRepository != null) {
- // This poll to ensures we have an assigned partition
otherwise seek won't work
+ // This poll to ensures we have an assigned partition
+ // otherwise seek won't work
ConsumerRecords poll = consumer.poll(100);
- for (TopicPartition topicPartition : (Set<TopicPartition>)
consumer.assignment()) {
+ for (TopicPartition topicPartition :
(Set<TopicPartition>)consumer.assignment()) {
String offsetState =
offsetRepository.getState(serializeOffsetKey(topicPartition));
if (offsetState != null && !offsetState.isEmpty()) {
- // The state contains the last read offset so you
need to seek from the next one
+ // The state contains the last read offset so you
+ // need to seek from the next one
long offset = deserializeOffsetValue(offsetState)
+ 1;
log.debug("Resuming partition {} from offset {}
from state", topicPartition.partition(), offset);
consumer.seek(topicPartition, offset);
} else {
- // If the init poll has returned some data of a
currently unknown topic/partition in the state
- // then resume from their offset in order to avoid
losing data
+ // If the init poll has returned some data of a
+ // currently unknown topic/partition in the state
+ // then resume from their offset in order to avoid
+ // losing data
List<ConsumerRecord<Object, Object>>
partitionRecords = poll.records(topicPartition);
if (!partitionRecords.isEmpty()) {
long offset = partitionRecords.get(0).offset();
@@ -274,12 +283,14 @@ public class KafkaConsumer extends DefaultConsumer {
} else if (endpoint.getConfiguration().getSeekTo() != null) {
if
(endpoint.getConfiguration().getSeekTo().equals("beginning")) {
log.debug("{} is seeking to the beginning on topic
{}", threadId, topicName);
- // This poll to ensures we have an assigned partition
otherwise seek won't work
+ // This poll to ensures we have an assigned partition
+ // otherwise seek won't work
consumer.poll(100);
consumer.seekToBeginning(consumer.assignment());
} else if
(endpoint.getConfiguration().getSeekTo().equals("end")) {
log.debug("{} is seeking to the end on topic {}",
threadId, topicName);
- // This poll to ensures we have an assigned partition
otherwise seek won't work
+ // This poll to ensures we have an assigned partition
+ // otherwise seek won't work
consumer.poll(100);
consumer.seekToEnd(consumer.assignment());
}
@@ -304,21 +315,23 @@ public class KafkaConsumer extends DefaultConsumer {
while (!breakOnErrorHit &&
recordIterator.hasNext()) {
record = recordIterator.next();
if (log.isTraceEnabled()) {
- log.trace("Partition = {}, offset = {},
key = {}, value = {}", record.partition(), record.offset(), record.key(),
- record.value());
+ log.trace("Partition = {}, offset = {},
key = {}, value = {}", record.partition(), record.offset(), record.key(),
record.value());
}
Exchange exchange =
endpoint.createKafkaExchange(record);
propagateHeaders(record, exchange,
endpoint.getConfiguration());
- // if not auto commit then we have additional
information on the exchange
+ // if not auto commit then we have additional
+ // information on the exchange
if (!isAutoCommitEnabled()) {
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
!recordIterator.hasNext());
}
if
(endpoint.getConfiguration().isAllowManualCommit()) {
- // allow Camel users to access the Kafka
consumer API to be able to do for example manual commits
+ // allow Camel users to access the Kafka
+ // consumer API to be able to do for
example
+ // manual commits
KafkaManualCommit manual =
endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange,
consumer, topicName, threadId,
- offsetRepository, partition,
record.offset());
+
offsetRepository, partition,
record.offset());
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
}
@@ -329,29 +342,36 @@ public class KafkaConsumer extends DefaultConsumer {
}
if (exchange.getException() != null) {
- // processing failed due to an unhandled
exception, what should we do
+ // processing failed due to an unhandled
+ // exception, what should we do
if
(endpoint.getConfiguration().isBreakOnFirstError()) {
- // we are failing and we should break
out
- log.warn("Error during processing {}
from topic: {}. Will seek consumer to offset: {} and re-connect and start
polling again.",
- exchange, topicName,
partitionLastOffset);
- // force commit so we resume on next
poll where we failed
+ // we are failing and we should break
+ // out
+ log.warn("Error during processing {}
from topic: {}. Will seek consumer to offset: {} and re-connect and start
polling again.", exchange,
+ topicName,
partitionLastOffset);
+ // force commit so we resume on next
+ // poll where we failed
commitOffset(offsetRepository,
partition, partitionLastOffset, true);
// continue to next partition
breakOnErrorHit = true;
} else {
- // will handle/log the exception and
then continue to next
+ // will handle/log the exception and
+ // then continue to next
getExceptionHandler().handleException("Error during processing", exchange,
exchange.getException());
}
} else {
// record was success so remember its
offset
partitionLastOffset = record.offset();
- //lastOffsetProcessed would be used by
Consumer re-balance listener to preserve offset state upon partition revoke
+ // lastOffsetProcessed would be used by
+ // Consumer re-balance listener to preserve
+ // offset state upon partition revoke
lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
}
}
if (!breakOnErrorHit) {
- // all records processed from partition so
commit them
+ // all records processed from partition so
+ // commit them
commitOffset(offsetRepository, partition,
partitionLastOffset, false);
}
}
@@ -385,7 +405,8 @@ public class KafkaConsumer extends DefaultConsumer {
consumer.unsubscribe();
Thread.currentThread().interrupt();
} catch (KafkaException e) {
- // some kind of error in kafka, it may happen during
unsubscribing or during normal processing
+ // some kind of error in kafka, it may happen during
+ // unsubscribing or during normal processing
if (unsubscribing) {
getExceptionHandler().handleException("Error unsubscribing
" + threadId + " from kafka topic " + topicName, e);
} else {
@@ -415,7 +436,8 @@ public class KafkaConsumer extends DefaultConsumer {
}
private void shutdown() {
- // As advised in the KAFKA-1894 ticket, calling this wakeup method
breaks the infinite loop
+ // As advised in the KAFKA-1894 ticket, calling this wakeup method
+ // breaks the infinite loop
consumer.wakeup();
}
@@ -424,11 +446,11 @@ public class KafkaConsumer extends DefaultConsumer {
log.debug("onPartitionsRevoked: {} from topic {}", threadId,
topicName);
StateRepository<String, String> offsetRepository =
endpoint.getConfiguration().getOffsetRepository();
- for (TopicPartition partition : partitions) {
+ for (TopicPartition partition : partitions) {
String offsetKey = serializeOffsetKey(partition);
Long offset = lastProcessedOffset.get(offsetKey);
if (offset == null) {
- offset = -1l;
+ offset = -1L;
}
log.debug("Saving offset repository state {} from offsetKey {}
with offset: {}", threadId, offsetKey, offset);
commitOffset(offsetRepository, partition, offset, true);
@@ -445,7 +467,8 @@ public class KafkaConsumer extends DefaultConsumer {
for (TopicPartition partition : partitions) {
String offsetState =
offsetRepository.getState(serializeOffsetKey(partition));
if (offsetState != null && !offsetState.isEmpty()) {
- // The state contains the last read offset so you need
to seek from the next one
+ // The state contains the last read offset so you need
+ // to seek from the next one
long offset = deserializeOffsetValue(offsetState) + 1;
log.debug("Resuming partition {} from offset {} from
state", partition.partition(), offset);
consumer.seek(partition, offset);
@@ -458,9 +481,8 @@ public class KafkaConsumer extends DefaultConsumer {
private void propagateHeaders(ConsumerRecord<Object, Object> record,
Exchange exchange, KafkaConfiguration kafkaConfiguration) {
HeaderFilterStrategy headerFilterStrategy =
kafkaConfiguration.getHeaderFilterStrategy();
KafkaHeaderDeserializer headerDeserializer =
kafkaConfiguration.getKafkaHeaderDeserializer();
- StreamSupport.stream(record.headers().spliterator(), false)
- .filter(header -> shouldBeFiltered(header, exchange,
headerFilterStrategy))
- .forEach(header -> exchange.getIn().setHeader(header.key(),
headerDeserializer.deserialize(header.key(), header.value())));
+ StreamSupport.stream(record.headers().spliterator(),
false).filter(header -> shouldBeFiltered(header, exchange,
headerFilterStrategy))
+ .forEach(header -> exchange.getIn().setHeader(header.key(),
headerDeserializer.deserialize(header.key(), header.value())));
}
private boolean shouldBeFiltered(Header header, Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
@@ -483,4 +505,3 @@ public class KafkaConsumer extends DefaultConsumer {
return Long.parseLong(offset);
}
}
-
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
index 385e658..4556f13 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalancePartitionRevokeTest.java
@@ -63,8 +63,8 @@ public class KafkaConsumerRebalancePartitionRevokeTest
extends BaseEmbeddedKafka
@Test
public void ensurePartitionRevokeCallsWithLastProcessedOffset() throws
Exception {
boolean partitionRevokeCalled = messagesLatch.await(30000,
TimeUnit.MILLISECONDS);
- assertTrue("StateRepository.setState should have been called with
offset >= 0 for topic" + TOPIC +
- ". Remaining count : " + messagesLatch.getCount(),
partitionRevokeCalled);
+ assertTrue("StateRepository.setState should have been called with
offset >= 0 for topic" + TOPIC
+ + ". Remaining count : " + messagesLatch.getCount(),
partitionRevokeCalled);
}
@Override
@@ -92,7 +92,7 @@ public class KafkaConsumerRebalancePartitionRevokeTest
extends BaseEmbeddedKafka
}
public class OffsetStateRepository extends MemoryStateRepository {
- CountDownLatch messagesLatch = null;
+ CountDownLatch messagesLatch;
public OffsetStateRepository(CountDownLatch messagesLatch) {
this.messagesLatch = messagesLatch;
@@ -114,7 +114,7 @@ public class KafkaConsumerRebalancePartitionRevokeTest
extends BaseEmbeddedKafka
@Override
public void setState(String key, String value) {
if (key.contains(TOPIC) && messagesLatch.getCount() > 0
- && Long.parseLong(value) >= 0) {
+ && Long.parseLong(value) >= 0) {
messagesLatch.countDown();
}
super.setState(key, value);
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
index 7deef47..077d689 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerRebalanceTest.java
@@ -44,8 +44,8 @@ public class KafkaConsumerRebalanceTest extends
BaseEmbeddedKafkaTest {
@Test
public void offsetGetStateMustHaveBeenCalledTwice() throws Exception {
boolean offsetGetStateCalled = messagesLatch.await(30000,
TimeUnit.MILLISECONDS);
- assertTrue("StateRepository.getState should have been called twice for
topic " + TOPIC +
- ". Remaining count : " + messagesLatch.getCount(),
offsetGetStateCalled);
+ assertTrue("StateRepository.getState should have been called twice for
topic " + TOPIC
+ + ". Remaining count : " + messagesLatch.getCount(),
offsetGetStateCalled);
}
@Override
@@ -73,7 +73,7 @@ public class KafkaConsumerRebalanceTest extends
BaseEmbeddedKafkaTest {
}
public class OffsetStateRepository implements StateRepository<String,
String> {
- CountDownLatch messagesLatch = null;
+ CountDownLatch messagesLatch;
public OffsetStateRepository(CountDownLatch messagesLatch) {
this.messagesLatch = messagesLatch;