Hi Mihail, the issue is actually a bug in Kafka. We have a JIRA in Flink for this as well: https://issues.apache.org/jira/browse/FLINK-3067
Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a fix. Since the kafka connector is not contained in the flink binary, you can just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will then download the code planned for the 0.10-SNAPSHOT release. On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <mihail.vi...@zalando.de> wrote: > Hi, > > we get the following NullPointerException after ~50 minutes when running a > streaming job with windowing and state that reads data from Kafka and > writes the result to local FS. > There are around 170 million messages to be processed, Flink 0.10.1 stops > at ~8 million. > Flink runs locally, started with the "start-cluster-streaming.sh" script. > > 12/01/2015 15:06:24 Job execution switched to status RUNNING. > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched > to SCHEDULED > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched > to DEPLOYING > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > SCHEDULED > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > DEPLOYING > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched > to RUNNING > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > RUNNING > 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > CANCELED > 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched > to FAILED > java.lang.Exception > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) > > > Any ideas on what could cause this behaviour? > > Best, > Mihail >