[
https://issues.apache.org/jira/browse/NIFI-14656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17974017#comment-17974017
]
ASF subversion and git services commented on NIFI-14656:
--------------------------------------------------------
Commit 66074bc881ce7b46989cce89d4ee13b96d543ebe in nifi's branch
refs/heads/main from Mark Payne
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=66074bc881 ]
NIFI-14656 Fixed Kafka Consumer handling when not committing offsets and added
new counters (#10015)
Ensure that we return the Kafka Consumer to the pool even if we are not
committing offsets. Updated counters to show how many events are acknowledged
vs rolled back to help clarify.
Signed-off-by: David Handermann <[email protected]>
> ConsumeKafka does not properly reuse Kafka Consumer if configured not to
> commit offsets
> ---------------------------------------------------------------------------------------
>
> Key: NIFI-14656
> URL: https://issues.apache.org/jira/browse/NIFI-14656
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> The {{commitOffsets}} method currently looks like this:
> {code:java}
> private void commitOffsets(final KafkaConsumerService consumerService, final
> OffsetTracker offsetTracker, final PollingContext pollingContext) {
> if (!commitOffsets) {
> return;
> }
> try {
>
> consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
> consumerServices.offer(consumerService);
> getLogger().debug("Committed offsets for Kafka Consumer Service");
> } catch (final Exception e) {
> close(consumerService);
> getLogger().error("Failed to commit offsets for Kafka Consumer
> Service", e);
> }
> }
> {code}
> The first thing it does is check if commitOffsets is false and if so returns.
> This means that it does not return the ConsumerService back to the
> consumerServices queue, so it can never be reused. While we want to avoid
> calling {{consumerService.commit}} in this case, we must still call
> {{consuemrServices.offer}} to reuse the consumer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)