Hi,

I am working on a use case that involves storing state for billions of
keys. For this we use a MySql state backend that will write each key-value
state to MySql server so it will only hold a limited set of key-value pairs
on heap while maintaining the processing guarantees.

This will keep our streaming job from running out of memory as most of the
state is off heap. I am not sure if this is relevant to your use case but
if the state size grows indefinitely you might want to give it a try.

I will write a detailed guide in some days but if you want to get started
check this one out:
https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing

There are some pending improvements that I will commit in the next days
that will increase the performance of the MySql adapter

Let me know if you are interested in this!

Cheers,
Gyula


Vieru, Mihail <mihail.vi...@zalando.de> ezt írta (időpont: 2015. dec. 2.,
Sze, 11:26):

> Hi Aljoscha,
>
> we have no upper bound for the number of expected keys. The max size for
> an element is 1 KB.
>
> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText
> operators in the job. In the first Map we parse the contained JSON object
> in each element and forward it as a Flink Tuple. In the Reduce we update
> the state for each key. That's about it.
>
> Best,
> Mihail
>
>
> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
>> Hi Mihail,
>> could you please give some information about the number of keys that you
>> are expecting in the data and how big the elements are that you are
>> processing in the window.
>>
>> Also, are there any other operations that could be taxing on Memory. I
>> think the different exception you see for 500MB mem size is just because
>> Java notices that it ran out of memory at a different part in the program.
>>
>> Cheers,
>> Aljoscha
>> > On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vi...@zalando.de>
>> wrote:
>> >
>> > Yes, with the "start-cluster-streaming.sh" script.
>> > If the TaskManager gets 5GB of heap it manages to process ~100 million
>> messages and then throws the above OOM.
>> > If it gets only 500MB it manages to process ~8 million and a somewhat
>> misleading exception is thrown:
>> >
>> > 12/01/2015 19:14:07    Source: Custom Source -> Map -> Map(1/1)
>> switched to FAILED
>> > java.lang.Exception: Java heap space
>> >     at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>> >     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>> >     at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> >     at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>> >     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> >     at java.lang.Thread.run(Thread.java:745)
>> > Caused by: java.lang.OutOfMemoryError: Java heap space
>> >     at org.json.simple.parser.Yylex.<init>(Yylex.java:231)
>> >     at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34)
>> >     at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
>> >     at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
>> >     at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
>> >     at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
>> >     at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
>> >     at
>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
>> >     at
>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
>> >     at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
>> >     at
>> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
>> >     at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)
>> >
>> >
>> >
>> >
>> > 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>> > Its good news that the issue has been resolved.
>> >
>> > Regarding the OOM, did you start Flink in the streaming mode?
>> >
>> > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <mihail.vi...@zalando.de>
>> wrote:
>> > Thank you, Robert! The issue with Kafka is now solved with the
>> 0.10-SNAPSHOT dependency.
>> >
>> > We have run into an OutOfMemory exception though, which appears to be
>> related to the state. As my colleague, Javier Lopez, mentioned in a
>> previous thread, state handling is crucial for our use case. And as the
>> jobs are intended to run for months, stability plays an important role in
>> choosing a stream processing framework.
>> >
>> > 12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to FAILED
>> > java.lang.OutOfMemoryError: Java heap space
>> >     at java.util.HashMap.resize(HashMap.java:703)
>> >     at java.util.HashMap.putVal(HashMap.java:662)
>> >     at java.util.HashMap.put(HashMap.java:611)
>> >     at
>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>> >     at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>> >     at
>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>> >     at
>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>> >     at
>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>> >     at
>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>> >     at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>> >     at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>> >     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> >     at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>> >
>> >
>> > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>:
>> > Thanks! I've linked the issue in JIRA.
>> >
>> > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>> > > I think its this one https://issues.apache.org/jira/browse/KAFKA-824
>> > >
>> > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>> > >>
>> > >> I know this has been fixed already but, out of curiosity, could you
>> > >> point me to the Kafka JIRA issue for this
>> > >> bug? From the Flink issue it looks like this is a Zookeeper version
>> > >> mismatch.
>> > >>
>> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <rmetz...@apache.org>
>> > >> wrote:
>> > >> > Hi Gyula,
>> > >> >
>> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>> > >> > "release-0.10" branch to Apache's maven snapshot repository.
>> > >> >
>> > >> >
>> > >> > I don't think Mihail's code will run when he's compiling it against
>> > >> > 1.0-SNAPSHOT.
>> > >> >
>> > >> >
>> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <gyula.f...@gmail.com>
>> wrote:
>> > >> >>
>> > >> >> Hi,
>> > >> >>
>> > >> >> I think Robert meant to write setting the connector dependency to
>> > >> >> 1.0-SNAPSHOT.
>> > >> >>
>> > >> >> Cheers,
>> > >> >> Gyula
>> > >> >>
>> > >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015.
>> dec. 1.,
>> > >> >> K,
>> > >> >> 17:10):
>> > >> >>>
>> > >> >>> Hi Mihail,
>> > >> >>>
>> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink
>> for this
>> > >> >>> as
>> > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>> > >> >>>
>> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will
>> contain
>> > >> >>> a
>> > >> >>> fix.
>> > >> >>>
>> > >> >>> Since the kafka connector is not contained in the flink binary,
>> you
>> > >> >>> can
>> > >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT.
>> Maven
>> > >> >>> will
>> > >> >>> then download the code planned for the 0.10-SNAPSHOT release.
>> > >> >>>
>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>> > >> >>> <mihail.vi...@zalando.de>
>> > >> >>> wrote:
>> > >> >>>>
>> > >> >>>> Hi,
>> > >> >>>>
>> > >> >>>> we get the following NullPointerException after ~50 minutes when
>> > >> >>>> running
>> > >> >>>> a streaming job with windowing and state that reads data from
>> Kafka
>> > >> >>>> and
>> > >> >>>> writes the result to local FS.
>> > >> >>>> There are around 170 million messages to be processed, Flink
>> 0.10.1
>> > >> >>>> stops at ~8 million.
>> > >> >>>> Flink runs locally, started with the
>> "start-cluster-streaming.sh"
>> > >> >>>> script.
>> > >> >>>>
>> > >> >>>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
>> > >> >>>> switched
>> > >> >>>> to SCHEDULED
>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
>> > >> >>>> switched
>> > >> >>>> to DEPLOYING
>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce
>> at
>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> > >> >>>> SCHEDULED
>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce
>> at
>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> > >> >>>> DEPLOYING
>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1)
>> > >> >>>> switched
>> > >> >>>> to RUNNING
>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce
>> at
>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> > >> >>>> RUNNING
>> > >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce
>> at
>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>> switched to
>> > >> >>>> CANCELED
>> > >> >>>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1)
>> > >> >>>> switched
>> > >> >>>> to FAILED
>> > >> >>>> java.lang.Exception
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>> > >> >>>>     at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> > >> >>>>     at java.lang.Thread.run(Thread.java:745)
>> > >> >>>> Caused by: java.lang.NullPointerException
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>> > >> >>>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>> > >> >>>>     at
>> > >> >>>>
>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> > >> >>>>     at
>> > >> >>>>
>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>> > >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>> > >> >>>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>> > >> >>>>     at
>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>> > >> >>>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>> > >> >>>>     at
>> > >> >>>>
>> > >> >>>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>> > >> >>>>
>> > >> >>>>
>> > >> >>>> Any ideas on what could cause this behaviour?
>> > >> >>>>
>> > >> >>>> Best,
>> > >> >>>> Mihail
>> > >> >>>
>> > >> >>>
>> > >> >
>> > >
>> > >
>> >
>> >
>> >
>>
>>
>

Reply via email to