Scott Reynolds created KAFKA-4632:
-------------------------------------

             Summary: Kafka Connect WorkerSinkTask.closePartitions doesn't 
handle WakeupException
                 Key: KAFKA-4632
                 URL: https://issues.apache.org/jira/browse/KAFKA-4632
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.10.0.1, 0.10.0.0, 0.10.1.0
            Reporter: Scott Reynolds


WorkerSinkTask's closePartitions method isn't handling WakeupException that can 
be thrown from commitSync.

{code}
org.apache.kafka.common.errors.WakeupException
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
 (ConsumerNetworkClient.java:404)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
(ConsumerNetworkClient.java:245)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
 (ConsumerCoordinator.java:499)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync 
(KafkaConsumer.java:1104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync 
(WorkerSinkTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit 
(WorkerSinkTask.java:264)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets 
(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions 
(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute 
(WorkerSinkTask.java:147)
at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
at java.lang.Thread.run (Thread.java:745)
{code}

I believe it should catch it and ignore it as that is what the poll method does 
when isStopping is true

{code:java}
        } catch (WakeupException we) {
            log.trace("{} consumer woken up", id);

            if (isStopping())
                return;

            if (shouldPause()) {
                pauseAll();
            } else if (!pausedForRedelivery) {
                resumeAll();
            }
        }
{code}

But unsure, love some insight into this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to