Hi,

we get the following NullPointerException after ~50 minutes when running a
streaming job with windowing and state that reads data from Kafka and
writes the result to local FS.
There are around 170 million messages to be processed, Flink 0.10.1 stops
at ~8 million.
Flink runs locally, started with the "start-cluster-streaming.sh" script.

12/01/2015 15:06:24    Job execution switched to status RUNNING.
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to
SCHEDULED
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to
DEPLOYING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
SCHEDULED
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
DEPLOYING
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to
RUNNING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
RUNNING
12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
CANCELED
12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched to
FAILED
java.lang.Exception
    at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at
org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
    at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
    at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)


Any ideas on what could cause this behaviour?

Best,
Mihail

Reply via email to