Ewen Cheslack-Postava created KAFKA-2886: --------------------------------------------
Summary: WorkerSinkTask doesn't catch exceptions from rebalance callbacks Key: KAFKA-2886 URL: https://issues.apache.org/jira/browse/KAFKA-2886 Project: Kafka Issue Type: Bug Components: copycat Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava WorkerSinkTask exposes rebalance callbacks to tasks by invoking onPartitionsRevoked and onPartitionsAssigned on the task. However, these aren't guarded by try/catch blocks, so they can propagate the errors up to the consumer: {quote} [2015-11-24 15:52:24,071] ERROR User provided listener org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on partition assignment: (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.clear(Collections.java:1094) at io.confluent.connect.hdfs.DataWriter.onPartitionsAssigned(DataWriter.java:207) at io.confluent.connect.hdfs.HdfsSinkTask.onPartitionsAssigned(HdfsSinkTask.java:103) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:369) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:189) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:227) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:306) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:861) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82) [2015-11-24 15:52:24,477] INFO Cannot acquire lease on WAL hdfs://worker4:9000/logs/test/0/log (io.confluent.connect.hdfs.wal.FSWAL) {quote} This actually currently works ok for onPartitionsAssigned because the callback is the last thing invoked. For onPartitionsRevoked, it causes offsets to not be committed and the current message batch being processed to not be cleared. Additionally, we may need to do something more to clean up, e.g. the task may need to stop processing data entirely since the task may now be in a bad state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)