Ewen Cheslack-Postava created KAFKA-2886:
--------------------------------------------

             Summary: WorkerSinkTask doesn't catch exceptions from rebalance 
callbacks
                 Key: KAFKA-2886
                 URL: https://issues.apache.org/jira/browse/KAFKA-2886
             Project: Kafka
          Issue Type: Bug
          Components: copycat
            Reporter: Ewen Cheslack-Postava
            Assignee: Ewen Cheslack-Postava


WorkerSinkTask exposes rebalance callbacks to tasks by invoking 
onPartitionsRevoked and onPartitionsAssigned on the task. However, these aren't 
guarded by try/catch blocks, so they can propagate the errors up to the 
consumer:

{quote}
[2015-11-24 15:52:24,071] ERROR User provided listener 
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance failed on 
partition assignment:  
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
java.lang.UnsupportedOperationException
        at 
java.util.Collections$UnmodifiableCollection.clear(Collections.java:1094)
        at 
io.confluent.connect.hdfs.DataWriter.onPartitionsAssigned(DataWriter.java:207)
        at 
io.confluent.connect.hdfs.HdfsSinkTask.onPartitionsAssigned(HdfsSinkTask.java:103)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:369)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:189)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:227)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:306)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:861)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at 
org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
[2015-11-24 15:52:24,477] INFO Cannot acquire lease on WAL 
hdfs://worker4:9000/logs/test/0/log (io.confluent.connect.hdfs.wal.FSWAL)
{quote}

This actually currently works ok for onPartitionsAssigned because the callback 
is the last thing invoked. For onPartitionsRevoked, it causes offsets to not be 
committed and the current message batch being processed to not be cleared. 
Additionally, we may need to do something more to clean up, e.g. the task may 
need to stop processing data entirely since the task may now be in a bad state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to