Chris Egerton created KAFKA-10188:
-------------------------------------

             Summary: Sink task preCommit method gets called after task is 
stopped
                 Key: KAFKA-10188
                 URL: https://issues.apache.org/jira/browse/KAFKA-10188
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
            Reporter: Chris Egerton
            Assignee: Chris Egerton


When the [final cleanup for a sink 
task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L191]
 is initiated, the framework [first calls stop() on the 
task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L167],
 and then [closes the consumer for the 
task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L171].
 Closing the consumer has the side effect of triggering [the 
onPartitionsRevoked 
method|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked-java.util.Collection-]
 of its {{ConsumerRebalanceListener}}, which in turn [causes the framework to 
call 
WorkerSinkTask::closePartitions|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L694],
 which in turn [calls 
WorkerSinkTask::commitOffsets|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L618],
 and finally, in turn [calls 
SinkTask:preCommit|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L386].

Calling {{SinkTask:preCommit}} after {{SinkTask::stop}} is likely to cause 
errors with tasks as they should be performing resource cleanup during 
{{stop}}, and the [current documentation on the SinkTask 
lifecycle|https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/sink/SinkTask.html]
 makes no mention of anything happening after tasks are stopped.

 

The framework already [ensures that offsets are 
committed|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]
 for tasks after the last call to {{SinkTask:put}} has been made, so the offset 
commit after {{SinkTask::stop}} has already been invoked can and should be 
removed with no compromise of existing delivery guarantees provided by the 
framework.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to