This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit aef4ddd7491ffb78642f9943dea90e9c378e2d99 Author: codesmell <[email protected]> AuthorDate: Wed Nov 15 13:16:10 2023 -0500 CAMEL-20044: add extra logging for BreakOnFirstError (#11920) --- .../camel/component/kafka/KafkaFetchRecords.java | 32 +++++++++++++++ .../kafka/consumer/AbstractCommitManager.java | 2 + .../consumer/support/KafkaRecordProcessor.java | 45 ++++++++++++++++------ .../support/KafkaRecordProcessorFacade.java | 3 ++ .../kafka/consumer/support/ProcessingResult.java | 13 ++++++- 5 files changed, 81 insertions(+), 14 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 987a4a36c21..9fb130f6697 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -319,7 +319,9 @@ public class KafkaFetchRecords implements Runnable { kafkaConsumer, threadId, commitManager, consumerListener); Duration pollDuration = Duration.ofMillis(pollTimeoutMs); + ProcessingResult lastResult = null; + while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { @@ -328,7 +330,32 @@ public class KafkaFetchRecords implements Runnable { } } + if (lastResult != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("This polling iteration is using lastresult on partition {} and offset {}", + lastResult.getPartition(), lastResult.getPartitionLastOffset()); + } + + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("This polling iteration is using lastresult of null"); + } + } + ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult); + + if (result != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("This polling iteration had a result returned for partition {} and offset {}", + result.getPartition(), result.getPartitionLastOffset()); + } + + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("This polling iteration had a result returned as null"); + } + } + updateTaskState(); if (result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) { LOG.debug("We hit an error ... setting flags to force reconnect"); @@ -337,6 +364,11 @@ public class KafkaFetchRecords implements Runnable { setConnected(false); } else { lastResult = result; + + if (LOG.isTraceEnabled()) { + LOG.trace("setting lastresult to partition {} and offset {}", + lastResult.getPartition(), lastResult.getPartitionLastOffset()); + } } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java index fe5abc3e403..a1da9823be4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java @@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory; public abstract class AbstractCommitManager implements CommitManager { public static final long START_OFFSET = -1; + public static final long NON_PARTITION = -1; + private static final Logger LOG = LoggerFactory.getLogger(AbstractCommitManager.class); protected final KafkaConsumer kafkaConsumer; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index 1afe53cbe2b..97875b097f1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -63,6 +63,9 @@ public class KafkaRecordProcessor { message.setHeader(KafkaConstants.KEY, record.key()); } + LOG.debug("setting up the exchange for message from partition {} and offset {}", + record.partition(), record.offset()); + message.setBody(record.value()); } @@ -82,7 +85,7 @@ public class KafkaRecordProcessor { } public ProcessingResult processExchange( - Exchange exchange, TopicPartition partition, boolean partitionHasNext, + Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext, boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) { @@ -100,7 +103,7 @@ public class KafkaRecordProcessor { if (configuration.isAllowManualCommit()) { // allow Camel users to access the Kafka consumer API to be able to do for example manual commits - KafkaManualCommit manual = commitManager.getManualCommit(exchange, partition, record); + KafkaManualCommit manual = commitManager.getManualCommit(exchange, topicPartition, record); message.setHeader(KafkaConstants.MANUAL_COMMIT, manual); message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext); @@ -112,30 +115,48 @@ public class KafkaRecordProcessor { exchange.setException(e); } if (exchange.getException() != null) { - boolean breakOnErrorExit = processException(exchange, partition, lastResult.getPartitionLastOffset(), + + LOG.debug("An exception was thrown for record at partition {} and offset {}", + record.partition(), record.offset()); + + boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult, exceptionHandler); - return new ProcessingResult(breakOnErrorExit, lastResult.getPartitionLastOffset(), true); + + return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true); } else { - return new ProcessingResult(false, record.offset(), exchange.getException() != null); + return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null); } } private boolean processException( - Exchange exchange, TopicPartition partition, long partitionLastOffset, + Exchange exchange, TopicPartition topicPartition, + ConsumerRecord<Object, Object> record, ProcessingResult lastResult, ExceptionHandler exceptionHandler) { // processing failed due to an unhandled exception, what should we do if (configuration.isBreakOnFirstError()) { + + if (lastResult.getPartition() != -1 && + lastResult.getPartition() != record.partition()) { + LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " + + " The last result was on partition {} with offset {} but was expecting partition {} with offset {}", + topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(), + record.partition(), record.offset()); + } + // we are failing and we should break out if (LOG.isWarnEnabled()) { - LOG.warn("Error during processing {} from topic: {}", exchange, partition.topic(), exchange.getException()); - LOG.warn("Will seek consumer to offset {} and start polling again.", partitionLastOffset); + Exception exc = exchange.getException(); + LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), + exc.getMessage()); + LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", + lastResult.getPartitionLastOffset(), lastResult.getPartition()); } - // force commit, so we resume on next poll where we failed except when the failure happened - // at the first message in a poll - if (partitionLastOffset != AbstractCommitManager.START_OFFSET) { - commitManager.forceCommit(partition, partitionLastOffset); + // force commit, so we resume on next poll where we failed + // except when the failure happened at the first message in a poll + if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { + commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); } // continue to next partition diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java index fbf6f3d09a8..134246891fb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java @@ -78,6 +78,9 @@ public class KafkaRecordProcessorFacade { lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, kafkaRecordProcessor, record); + LOG.debug("Processed record on partition {} and offset {} and got result for partition {} and offset {}", + record.partition(), record.offset(), lastResult.getPartition(), lastResult.getPartitionLastOffset()); + if (consumerListener != null) { if (!consumerListener.afterProcess(lastResult)) { commitManager.commit(partition); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java index 36f0c69c8b2..fe3afd6ee8d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java @@ -21,14 +21,19 @@ import org.apache.camel.component.kafka.consumer.AbstractCommitManager; public final class ProcessingResult { private static final ProcessingResult UNPROCESSED_RESULT - = new ProcessingResult(false, AbstractCommitManager.START_OFFSET, false); + = new ProcessingResult( + false, + AbstractCommitManager.NON_PARTITION, + AbstractCommitManager.START_OFFSET, false); private final boolean breakOnErrorHit; + private final long lastPartition; private final long partitionLastOffset; private final boolean failed; - ProcessingResult(boolean breakOnErrorHit, long partitionLastOffset, boolean failed) { + ProcessingResult(boolean breakOnErrorHit, long lastPartition, long partitionLastOffset, boolean failed) { this.breakOnErrorHit = breakOnErrorHit; + this.lastPartition = lastPartition; this.partitionLastOffset = partitionLastOffset; this.failed = failed; } @@ -41,6 +46,10 @@ public final class ProcessingResult { return partitionLastOffset; } + public long getPartition() { + return lastPartition; + } + public boolean isFailed() { return failed; }
