Its good news that the issue has been resolved. Regarding the OOM, did you start Flink in the streaming mode?
On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <mihail.vi...@zalando.de> wrote: > Thank you, Robert! The issue with Kafka is now solved with the > 0.10-SNAPSHOT dependency. > > We have run into an OutOfMemory exception though, which appears to be > related to the state. As my colleague, Javier Lopez, mentioned in a > previous thread, state handling is crucial for our use case. And as the > jobs are intended to run for months, stability plays an important role in > choosing a stream processing framework. > > 12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > FAILED > java.lang.OutOfMemoryError: Java heap space > at java.util.HashMap.resize(HashMap.java:703) > at java.util.HashMap.putVal(HashMap.java:662) > at java.util.HashMap.put(HashMap.java:611) > at > org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) > at > de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121) > at > de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108) > at > org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > > > > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>: > >> Thanks! I've linked the issue in JIRA. >> >> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824 >> > >> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org> >> wrote: >> >> >> >> I know this has been fixed already but, out of curiosity, could you >> >> point me to the Kafka JIRA issue for this >> >> bug? From the Flink issue it looks like this is a Zookeeper version >> >> mismatch. >> >> >> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetz...@apache.org> >> >> wrote: >> >> > Hi Gyula, >> >> > >> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the >> >> > "release-0.10" branch to Apache's maven snapshot repository. >> >> > >> >> > >> >> > I don't think Mihail's code will run when he's compiling it against >> >> > 1.0-SNAPSHOT. >> >> > >> >> > >> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.f...@gmail.com> >> wrote: >> >> >> >> >> >> Hi, >> >> >> >> >> >> I think Robert meant to write setting the connector dependency to >> >> >> 1.0-SNAPSHOT. >> >> >> >> >> >> Cheers, >> >> >> Gyula >> >> >> >> >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. dec. >> 1., >> >> >> K, >> >> >> 17:10): >> >> >>> >> >> >>> Hi Mihail, >> >> >>> >> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for >> this >> >> >>> as >> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >> >> >>> >> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will >> contain >> >> >>> a >> >> >>> fix. >> >> >>> >> >> >>> Since the kafka connector is not contained in the flink binary, you >> >> >>> can >> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven >> >> >>> will >> >> >>> then download the code planned for the 0.10-SNAPSHOT release. >> >> >>> >> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail >> >> >>> <mihail.vi...@zalando.de> >> >> >>> wrote: >> >> >>>> >> >> >>>> Hi, >> >> >>>> >> >> >>>> we get the following NullPointerException after ~50 minutes when >> >> >>>> running >> >> >>>> a streaming job with windowing and state that reads data from >> Kafka >> >> >>>> and >> >> >>>> writes the result to local FS. >> >> >>>> There are around 170 million messages to be processed, Flink >> 0.10.1 >> >> >>>> stops at ~8 million. >> >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh" >> >> >>>> script. >> >> >>>> >> >> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >> >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> >> >>>> switched >> >> >>>> to SCHEDULED >> >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> >> >>>> switched >> >> >>>> to DEPLOYING >> >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> >> >>>> SCHEDULED >> >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> >> >>>> DEPLOYING >> >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> >> >>>> switched >> >> >>>> to RUNNING >> >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> >> >>>> RUNNING >> >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at >> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> >> >>>> CANCELED >> >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) >> >> >>>> switched >> >> >>>> to FAILED >> >> >>>> java.lang.Exception >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> >> >>>> at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> >> >>>> at java.lang.Thread.run(Thread.java:745) >> >> >>>> Caused by: java.lang.NullPointerException >> >> >>>> at >> >> >>>> >> >> >>>> >> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >> >> >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >> >> >>>> at >> >> >>>> >> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >> >> >>>> at >> >> >>>> >> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >> >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >> >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >> >> >>>> at >> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >> >> >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >> >> >>>> at >> >> >>>> >> >> >>>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >> >> >>>> >> >> >>>> >> >> >>>> Any ideas on what could cause this behaviour? >> >> >>>> >> >> >>>> Best, >> >> >>>> Mihail >> >> >>> >> >> >>> >> >> > >> > >> > >> > >