Hi, I am working on a use case that involves storing state for billions of keys. For this we use a MySql state backend that will write each key-value state to MySql server so it will only hold a limited set of key-value pairs on heap while maintaining the processing guarantees.
This will keep our streaming job from running out of memory as most of the state is off heap. I am not sure if this is relevant to your use case but if the state size grows indefinitely you might want to give it a try. I will write a detailed guide in some days but if you want to get started check this one out: https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing There are some pending improvements that I will commit in the next days that will increase the performance of the MySql adapter Let me know if you are interested in this! Cheers, Gyula Vieru, Mihail <mihail.vi...@zalando.de> ezt írta (időpont: 2015. dec. 2., Sze, 11:26): > Hi Aljoscha, > > we have no upper bound for the number of expected keys. The max size for > an element is 1 KB. > > There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText > operators in the job. In the first Map we parse the contained JSON object > in each element and forward it as a Flink Tuple. In the Reduce we update > the state for each key. That's about it. > > Best, > Mihail > > > 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > >> Hi Mihail, >> could you please give some information about the number of keys that you >> are expecting in the data and how big the elements are that you are >> processing in the window. >> >> Also, are there any other operations that could be taxing on Memory. I >> think the different exception you see for 500MB mem size is just because >> Java notices that it ran out of memory at a different part in the program. >> >> Cheers, >> Aljoscha >> > On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vi...@zalando.de> >> wrote: >> > >> > Yes, with the "start-cluster-streaming.sh" script. >> > If the TaskManager gets 5GB of heap it manages to process ~100 million >> messages and then throws the above OOM. >> > If it gets only 500MB it manages to process ~8 million and a somewhat >> misleading exception is thrown: >> > >> > 12/01/2015 19:14:07 Source: Custom Source -> Map -> Map(1/1) >> switched to FAILED >> > java.lang.Exception: Java heap space >> > at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> > at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >> > at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> > at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> > at java.lang.Thread.run(Thread.java:745) >> > Caused by: java.lang.OutOfMemoryError: Java heap space >> > at org.json.simple.parser.Yylex.<init>(Yylex.java:231) >> > at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34) >> > at >> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70) >> > at >> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65) >> > at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) >> > at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) >> > at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) >> > at >> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) >> > at >> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) >> > at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97) >> > at >> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92) >> > at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450) >> > >> > >> > >> > >> > 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetz...@apache.org>: >> > Its good news that the issue has been resolved. >> > >> > Regarding the OOM, did you start Flink in the streaming mode? >> > >> > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <mihail.vi...@zalando.de> >> wrote: >> > Thank you, Robert! The issue with Kafka is now solved with the >> 0.10-SNAPSHOT dependency. >> > >> > We have run into an OutOfMemory exception though, which appears to be >> related to the state. As my colleague, Javier Lopez, mentioned in a >> previous thread, state handling is crucial for our use case. And as the >> jobs are intended to run for months, stability plays an important role in >> choosing a stream processing framework. >> > >> > 12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to FAILED >> > java.lang.OutOfMemoryError: Java heap space >> > at java.util.HashMap.resize(HashMap.java:703) >> > at java.util.HashMap.putVal(HashMap.java:662) >> > at java.util.HashMap.put(HashMap.java:611) >> > at >> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) >> > at >> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121) >> > at >> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108) >> > at >> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196) >> > at >> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) >> > at >> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210) >> > at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) >> > at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) >> > at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > >> > >> > >> > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>: >> > Thanks! I've linked the issue in JIRA. >> > >> > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >> > > I think its this one https://issues.apache.org/jira/browse/KAFKA-824 >> > > >> > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org> >> wrote: >> > >> >> > >> I know this has been fixed already but, out of curiosity, could you >> > >> point me to the Kafka JIRA issue for this >> > >> bug? From the Flink issue it looks like this is a Zookeeper version >> > >> mismatch. >> > >> >> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetz...@apache.org> >> > >> wrote: >> > >> > Hi Gyula, >> > >> > >> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the >> > >> > "release-0.10" branch to Apache's maven snapshot repository. >> > >> > >> > >> > >> > >> > I don't think Mihail's code will run when he's compiling it against >> > >> > 1.0-SNAPSHOT. >> > >> > >> > >> > >> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.f...@gmail.com> >> wrote: >> > >> >> >> > >> >> Hi, >> > >> >> >> > >> >> I think Robert meant to write setting the connector dependency to >> > >> >> 1.0-SNAPSHOT. >> > >> >> >> > >> >> Cheers, >> > >> >> Gyula >> > >> >> >> > >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. >> dec. 1., >> > >> >> K, >> > >> >> 17:10): >> > >> >>> >> > >> >>> Hi Mihail, >> > >> >>> >> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink >> for this >> > >> >>> as >> > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >> > >> >>> >> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will >> contain >> > >> >>> a >> > >> >>> fix. >> > >> >>> >> > >> >>> Since the kafka connector is not contained in the flink binary, >> you >> > >> >>> can >> > >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. >> Maven >> > >> >>> will >> > >> >>> then download the code planned for the 0.10-SNAPSHOT release. >> > >> >>> >> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail >> > >> >>> <mihail.vi...@zalando.de> >> > >> >>> wrote: >> > >> >>>> >> > >> >>>> Hi, >> > >> >>>> >> > >> >>>> we get the following NullPointerException after ~50 minutes when >> > >> >>>> running >> > >> >>>> a streaming job with windowing and state that reads data from >> Kafka >> > >> >>>> and >> > >> >>>> writes the result to local FS. >> > >> >>>> There are around 170 million messages to be processed, Flink >> 0.10.1 >> > >> >>>> stops at ~8 million. >> > >> >>>> Flink runs locally, started with the >> "start-cluster-streaming.sh" >> > >> >>>> script. >> > >> >>>> >> > >> >>>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> > >> >>>> switched >> > >> >>>> to SCHEDULED >> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> > >> >>>> switched >> > >> >>>> to DEPLOYING >> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce >> at >> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> > >> >>>> SCHEDULED >> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce >> at >> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> > >> >>>> DEPLOYING >> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) >> > >> >>>> switched >> > >> >>>> to RUNNING >> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce >> at >> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> > >> >>>> RUNNING >> > >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce >> at >> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >> switched to >> > >> >>>> CANCELED >> > >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) >> > >> >>>> switched >> > >> >>>> to FAILED >> > >> >>>> java.lang.Exception >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> > >> >>>> at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> > >> >>>> at java.lang.Thread.run(Thread.java:745) >> > >> >>>> Caused by: java.lang.NullPointerException >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >> > >> >>>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >> > >> >>>> at >> > >> >>>> >> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >> > >> >>>> at >> > >> >>>> >> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >> > >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >> > >> >>>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >> > >> >>>> at >> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >> > >> >>>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >> > >> >>>> at >> > >> >>>> >> > >> >>>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >> > >> >>>> >> > >> >>>> >> > >> >>>> Any ideas on what could cause this behaviour? >> > >> >>>> >> > >> >>>> Best, >> > >> >>>> Mihail >> > >> >>> >> > >> >>> >> > >> > >> > > >> > > >> > >> > >> > >> >> >