Yes, with the "start-cluster-streaming.sh" script. If the TaskManager gets 5GB of heap it manages to process ~100 million messages and then throws the above OOM. If it gets only 500MB it manages to process ~8 million and a somewhat misleading exception is thrown:
12/01/2015 19:14:07 Source: Custom Source -> Map -> Map(1/1) switched to FAILED java.lang.Exception: Java heap space at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError: Java heap space at org.json.simple.parser.Yylex.<init>(Yylex.java:231) at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70) at de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450) 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetz...@apache.org>: > Its good news that the issue has been resolved. > > Regarding the OOM, did you start Flink in the streaming mode? > > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <mihail.vi...@zalando.de> > wrote: > >> Thank you, Robert! The issue with Kafka is now solved with the >> 0.10-SNAPSHOT dependency. >> >> We have run into an OutOfMemory exception though, which appears to be >> related to the state. As my colleague, Javier Lopez, mentioned in a >> previous thread, state handling is crucial for our use case. And as the >> jobs are intended to run for months, stability plays an important role in >> choosing a stream processing framework. >> >> 12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> FAILED >> java.lang.OutOfMemoryError: Java heap space >> at java.util.HashMap.resize(HashMap.java:703) >> at java.util.HashMap.putVal(HashMap.java:662) >> at java.util.HashMap.put(HashMap.java:611) >> at >> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) >> at >> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121) >> at >> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108) >> at >> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196) >> at >> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) >> at >> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> >> 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>: >> >>> Thanks! I've linked the issue in JIRA. >>> >>> On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> > I think its this one https://issues.apache.org/jira/browse/KAFKA-824 >>> > >>> > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org> >>> wrote: >>> >> >>> >> I know this has been fixed already but, out of curiosity, could you >>> >> point me to the Kafka JIRA issue for this >>> >> bug? From the Flink issue it looks like this is a Zookeeper version >>> >> mismatch. >>> >> >>> >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetz...@apache.org> >>> >> wrote: >>> >> > Hi Gyula, >>> >> > >>> >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the >>> >> > "release-0.10" branch to Apache's maven snapshot repository. >>> >> > >>> >> > >>> >> > I don't think Mihail's code will run when he's compiling it against >>> >> > 1.0-SNAPSHOT. >>> >> > >>> >> > >>> >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.f...@gmail.com> >>> wrote: >>> >> >> >>> >> >> Hi, >>> >> >> >>> >> >> I think Robert meant to write setting the connector dependency to >>> >> >> 1.0-SNAPSHOT. >>> >> >> >>> >> >> Cheers, >>> >> >> Gyula >>> >> >> >>> >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. >>> dec. 1., >>> >> >> K, >>> >> >> 17:10): >>> >> >>> >>> >> >>> Hi Mihail, >>> >> >>> >>> >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink for >>> this >>> >> >>> as >>> >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >>> >> >>> >>> >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will >>> contain >>> >> >>> a >>> >> >>> fix. >>> >> >>> >>> >> >>> Since the kafka connector is not contained in the flink binary, >>> you >>> >> >>> can >>> >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. >>> Maven >>> >> >>> will >>> >> >>> then download the code planned for the 0.10-SNAPSHOT release. >>> >> >>> >>> >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail >>> >> >>> <mihail.vi...@zalando.de> >>> >> >>> wrote: >>> >> >>>> >>> >> >>>> Hi, >>> >> >>>> >>> >> >>>> we get the following NullPointerException after ~50 minutes when >>> >> >>>> running >>> >> >>>> a streaming job with windowing and state that reads data from >>> Kafka >>> >> >>>> and >>> >> >>>> writes the result to local FS. >>> >> >>>> There are around 170 million messages to be processed, Flink >>> 0.10.1 >>> >> >>>> stops at ~8 million. >>> >> >>>> Flink runs locally, started with the "start-cluster-streaming.sh" >>> >> >>>> script. >>> >> >>>> >>> >> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >>> >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >>> >> >>>> switched >>> >> >>>> to SCHEDULED >>> >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >>> >> >>>> switched >>> >> >>>> to DEPLOYING >>> >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce >>> at >>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>> switched to >>> >> >>>> SCHEDULED >>> >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce >>> at >>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>> switched to >>> >> >>>> DEPLOYING >>> >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >>> >> >>>> switched >>> >> >>>> to RUNNING >>> >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce >>> at >>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>> switched to >>> >> >>>> RUNNING >>> >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce >>> at >>> >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>> switched to >>> >> >>>> CANCELED >>> >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) >>> >> >>>> switched >>> >> >>>> to FAILED >>> >> >>>> java.lang.Exception >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>> >> >>>> at >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> >> >>>> at java.lang.Thread.run(Thread.java:745) >>> >> >>>> Caused by: java.lang.NullPointerException >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >>> >> >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >>> >> >>>> at >>> >> >>>> >>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >>> >> >>>> at >>> >> >>>> >>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >>> >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >>> >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >>> >> >>>> at >>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >>> >> >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >>> >> >>>> at >>> >> >>>> >>> >> >>>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >>> >> >>>> >>> >> >>>> >>> >> >>>> Any ideas on what could cause this behaviour? >>> >> >>>> >>> >> >>>> Best, >>> >> >>>> Mihail >>> >> >>> >>> >> >>> >>> >> > >>> > >>> > >>> >> >> >