Hey! The problem here is that there is no such thing as proper thread killing in Java (at least it makes everything unstable if you do). Threads need to exit cooperatively.
The Kafka Function calls simply are uninterruptibly stuck and never return (pretty bad bug in their Zookeeper Client). As far as I know one cannot clean this up properly unless one kills the process. We could try and work around this by running the Zookeeper commit in a dedicated lightweight thread that shares no resources and thus does not make the system unstable if stopped (against better advise ;-) ) Stephan On Tue, Nov 17, 2015 at 10:43 AM, Ufuk Celebi <u...@apache.org> wrote: > https://issues.apache.org/jira/browse/KAFKA-824 > > This has been fixed for Kafka’s 0.9.0 version. > > We should investigate why the job gets stuck though. Do you have a stack > trace or any logs available? > > – Ufuk > > > On 17 Nov 2015, at 09:24, Gyula Fóra <gyf...@apache.org> wrote: > > > > Hey guys, > > > > I ran into some issue with the kafka consumers. > > > > I am reading from more than 50 topics with parallelism 1, and while > running > > the job I got the following exception during the checkpoint notification > > (offset committing): > > > > java.lang.RuntimeException: Error while confirming checkpoint > > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:935) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.NullPointerException > > at > > > org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) > > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) > > at > > > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) > > at > > > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) > > at > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:542) > > at > > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:478) > > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:931) > > ... 5 more > > > > This happened at the same time on multiple kafka consumers. Could this be > > some Zookeeper related issue? Maybe we should be aware of this. > > > > Another problem is that subsequently the whole pipeline got stuck at the > > sources while canceling so the job could never restart. Maybe it would be > > worth killing the whole thing and restart in these situations? > > > > Thanks, > > Gyula > >