Scott Reynolds created KAFKA-4632: ------------------------------------- Summary: Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException Key: KAFKA-4632 URL: https://issues.apache.org/jira/browse/KAFKA-4632 Project: Kafka Issue Type: Bug Affects Versions: 0.10.0.1, 0.10.0.0, 0.10.1.0 Reporter: Scott Reynolds
WorkerSinkTask's closePartitions method isn't handling WakeupException that can be thrown from commitSync. {code} org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup (ConsumerNetworkClient.java:404) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:245) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:180) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync (ConsumerCoordinator.java:499) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1104) at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync (WorkerSinkTask.java:245) at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit (WorkerSinkTask.java:264) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets (WorkerSinkTask.java:305) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions (WorkerSinkTask.java:435) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:147) at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) at java.util.concurrent.FutureTask.run (FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) at java.lang.Thread.run (Thread.java:745) {code} I believe it should catch it and ignore it as that is what the poll method does when isStopping is true {code:java} } catch (WakeupException we) { log.trace("{} consumer woken up", id); if (isStopping()) return; if (shouldPause()) { pauseAll(); } else if (!pausedForRedelivery) { resumeAll(); } } {code} But unsure, love some insight into this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)