Mark Payne created NIFI-14656:
---------------------------------

             Summary: ConsumeKafka does not properly reuse Kafka Consumer if 
configured not to commit offsets
                 Key: NIFI-14656
                 URL: https://issues.apache.org/jira/browse/NIFI-14656
             Project: Apache NiFi
          Issue Type: Bug
          Components: Core Framework
            Reporter: Mark Payne
            Assignee: Mark Payne


The {{commitOffsets}} method currently looks like this:

{code:java}
private void commitOffsets(final KafkaConsumerService consumerService, final 
OffsetTracker offsetTracker, final PollingContext pollingContext) {
    if (!commitOffsets) {
        return;
    }

    try {
        consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
        consumerServices.offer(consumerService);
        getLogger().debug("Committed offsets for Kafka Consumer Service");
    } catch (final Exception e) {
        close(consumerService);
        getLogger().error("Failed to commit offsets for Kafka Consumer 
Service", e);
    }
}
 {code}
The first thing it does is check if commitOffsets is false and if so returns. 
This means that it does not return the ConsumerService back to the 
consumerServices queue, so it can never be reused. While we want to avoid 
calling {{consumerService.commit}} in this case, we must still call 
{{consuemrServices.offer}} to reuse the consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to