Chris Egerton created KAFKA-10188: ------------------------------------- Summary: Sink task preCommit method gets called after task is stopped Key: KAFKA-10188 URL: https://issues.apache.org/jira/browse/KAFKA-10188 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Chris Egerton Assignee: Chris Egerton
When the [final cleanup for a sink task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L191] is initiated, the framework [first calls stop() on the task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L167], and then [closes the consumer for the task|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L171]. Closing the consumer has the side effect of triggering [the onPartitionsRevoked method|https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked-java.util.Collection-] of its {{ConsumerRebalanceListener}}, which in turn [causes the framework to call WorkerSinkTask::closePartitions|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L694], which in turn [calls WorkerSinkTask::commitOffsets|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L618], and finally, in turn [calls SinkTask:preCommit|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L386]. Calling {{SinkTask:preCommit}} after {{SinkTask::stop}} is likely to cause errors with tasks as they should be performing resource cleanup during {{stop}}, and the [current documentation on the SinkTask lifecycle|https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/sink/SinkTask.html] makes no mention of anything happening after tasks are stopped. The framework already [ensures that offsets are committed|https://github.com/apache/kafka/blob/3c43adff1d4562c6b33732f399691c9e2f887903/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] for tasks after the last call to {{SinkTask:put}} has been made, so the offset commit after {{SinkTask::stop}} has already been invoked can and should be removed with no compromise of existing delivery guarantees provided by the framework. -- This message was sent by Atlassian Jira (v8.3.4#803005)