Hi, we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS. There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million. Flink runs locally, started with the "start-cluster-streaming.sh" script.
12/01/2015 15:06:24 Job execution switched to status RUNNING. 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to SCHEDULED 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to DEPLOYING 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to RUNNING 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched to FAILED java.lang.Exception at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) Any ideas on what could cause this behaviour? Best, Mihail