C0urante commented on a change in pull request #8910: URL: https://github.com/apache/kafka/pull/8910#discussion_r497049737
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno); this.consumer = consumer; this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); + this.taskStopped = false; Review comment: The [Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html) for the `ConsumerRebalanceLister` state that the callback "will only execute in the user thread as part of the `poll(long)` call"; I think we have a guarantee here that `onPartitionsRevoked` will be called on the same thread that sets `taskStopped` to `false`. A fun way to verify this is to view the exceptions that get thrown by this bug; the stack traces include these lines: ``` at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:695) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:312) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:744) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:976) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:888) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2368) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2335) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2285) at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:933) at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:174) at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) ``` The only edge case I can think of might be with asynchronous offset commits, but fwict those don't trigger asynchronous rebalance listener callbacks (if they trigger rebalances or rebalance listener callbacks at all). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org