I know this has been fixed already but, out of curiosity, could you point me to the Kafka JIRA issue for this bug? From the Flink issue it looks like this is a Zookeeper version mismatch.
On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Gyula, > > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the > "release-0.10" branch to Apache's maven snapshot repository. > > > I don't think Mihail's code will run when he's compiling it against > 1.0-SNAPSHOT. > > > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: >> >> Hi, >> >> I think Robert meant to write setting the connector dependency to >> 1.0-SNAPSHOT. >> >> Cheers, >> Gyula >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. dec. 1., K, >> 17:10): >>> >>> Hi Mihail, >>> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for this as >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >>> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a >>> fix. >>> >>> Since the kafka connector is not contained in the flink binary, you can >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will >>> then download the code planned for the 0.10-SNAPSHOT release. >>> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <mihail.vi...@zalando.de> >>> wrote: >>>> >>>> Hi, >>>> >>>> we get the following NullPointerException after ~50 minutes when running >>>> a streaming job with windowing and state that reads data from Kafka and >>>> writes the result to local FS. >>>> There are around 170 million messages to be processed, Flink 0.10.1 >>>> stops at ~8 million. >>>> Flink runs locally, started with the "start-cluster-streaming.sh" >>>> script. >>>> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>>> to SCHEDULED >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>>> to DEPLOYING >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> SCHEDULED >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> DEPLOYING >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>>> to RUNNING >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> RUNNING >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>> CANCELED >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched >>>> to FAILED >>>> java.lang.Exception >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>> at java.lang.Thread.run(Thread.java:745) >>>> Caused by: java.lang.NullPointerException >>>> at >>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >>>> at >>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >>>> at >>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >>>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >>>> >>>> >>>> Any ideas on what could cause this behaviour? >>>> >>>> Best, >>>> Mihail >>> >>> >