A bit of extra information on the example where I posted the link: The example checks whether two events follow each other within a certain time: - The first event in the example is called "compute.instance.create.start" (in your case, it would be the event that an order was placed) - The second event is called "trove.instance.create" - (in your case that the package was sent)
What the timeout window does is the following: - It triggers either on the second event, or after the timeout is expired - The window function checks if the last event was the correct second event. If yes, it sends a Result(OK), if not it sends a Result(TIMEOUT). Hope that this helps you build your application! On Wed, Dec 2, 2015 at 6:25 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Mihail! > > Do I understand you correctly that the use case is to raise an alarm if an > order has not been processed within a certain time period (certain number > of days) ? > > If that is the case, the use case is actually perfect for a special form > of session windows that monitor such timeouts. I have prototyped a sample > application for a different use case, but it should fit your use case as > well: > > https://github.com/StephanEwen/flink-demos/blob/master/timeout-monitoring/src/main/java/com/mythingy/streaming/EventStreamAnalysis.java > > In that example, the timeout is 5 seconds, but there is no reason why the > timeout could not be multiple days. Windows may be very long - no problem. > > Unlike many other streaming systems, each key has an individual window, so > one key's session window may start at one point in time, and the other > key's session window at a very different point. One window may finish > within in a few hours (fast processed order), one window see the timout > after three days (order that was not processed in time). > > Greetings, > Stephan > > > On Wed, Dec 2, 2015 at 6:11 PM, Vieru, Mihail <mihail.vi...@zalando.de> > wrote: > >> Hi Gyula, Hi Stephan, >> >> thank you for your replies. >> >> We need a state which grows indefinitely for the following use case. An >> event is created when a customer places an order. Another event is created >> when the order is sent. These events typically occur within days. We need >> to catch the cases when the said events occur over a specified time period >> to raise an alarm. >> >> So having a window of a couple of days is not feasible. Thus we need the >> state. >> >> I believe having a different state backend would circumvent the OOM >> issue. We were thinking of Redis for performance reasons. MySQL might do as >> well, if it doesn't slow down the processing too much. >> >> Are there limitations for SqlStateBackend when working with state only? >> When would the window state limitation occur? >> >> Cheers, >> Mihail >> >> >> 2015-12-02 13:38 GMT+01:00 Stephan Ewen <se...@apache.org>: >> >>> Mihail! >>> >>> The Flink windows are currently in-memory only. There are plans to relax >>> that, but for the time being, having enough memory in the cluster is >>> important. >>> >>> @Gyula: I think window state is currently also limited when using the >>> SqlStateBackend, by the size of a row in the database (because windows are >>> not key/value state currently) >>> >>> >>> Here are some simple rules-of-thumb to work with: >>> >>> 1) For windows, the number of expected keys can be without bound. It is >>> important to have a rough upper bound for the number of "active keys at a >>> certain time". For example, if you have your time windows (let's say by 10 >>> minutes or so), it only matters how many keys you have within each 10 >>> minute interval. Those define how much memory you need. >>> >>> 2) If you work with the "OperatorState" abstraction, then you need to >>> think about cleanup a bit. The OperatorState keeps state currently for as >>> long until you set the state for the key to "null". This manual state is >>> explicitly designed to allow you to keep state across windows and across >>> very long time. On the flip side, you need to manage the amount of state >>> you store, by releasing state for keys. >>> >>> 3) If a certain key space grows infinite, you should "scope the state by >>> time". A pragmatic solution for that is to define a session window: >>> - The session length defines after what inactivity the state is >>> cleaned (let's say 1h session length or so) >>> - The trigger implements this session (there are a few mails on this >>> list already that explain how to do this) and take care of evaluating on >>> every element. >>> - A count(1) evictor makes sure only one element is ever stored >>> >>> Greetings, >>> Stephan >>> >>> >>> On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra <gyf...@apache.org> wrote: >>> >>>> Hi, >>>> >>>> I am working on a use case that involves storing state for billions of >>>> keys. For this we use a MySql state backend that will write each key-value >>>> state to MySql server so it will only hold a limited set of key-value pairs >>>> on heap while maintaining the processing guarantees. >>>> >>>> This will keep our streaming job from running out of memory as most of >>>> the state is off heap. I am not sure if this is relevant to your use case >>>> but if the state size grows indefinitely you might want to give it a try. >>>> >>>> I will write a detailed guide in some days but if you want to get >>>> started check this one out: >>>> >>>> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing >>>> >>>> There are some pending improvements that I will commit in the next days >>>> that will increase the performance of the MySql adapter >>>> >>>> Let me know if you are interested in this! >>>> >>>> Cheers, >>>> Gyula >>>> >>>> >>>> Vieru, Mihail <mihail.vi...@zalando.de> ezt írta (időpont: 2015. dec. >>>> 2., Sze, 11:26): >>>> >>>>> Hi Aljoscha, >>>>> >>>>> we have no upper bound for the number of expected keys. The max size >>>>> for an element is 1 KB. >>>>> >>>>> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText >>>>> operators in the job. In the first Map we parse the contained JSON object >>>>> in each element and forward it as a Flink Tuple. In the Reduce we update >>>>> the state for each key. That's about it. >>>>> >>>>> Best, >>>>> Mihail >>>>> >>>>> >>>>> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: >>>>> >>>>>> Hi Mihail, >>>>>> could you please give some information about the number of keys that >>>>>> you are expecting in the data and how big the elements are that you are >>>>>> processing in the window. >>>>>> >>>>>> Also, are there any other operations that could be taxing on Memory. >>>>>> I think the different exception you see for 500MB mem size is just >>>>>> because >>>>>> Java notices that it ran out of memory at a different part in the >>>>>> program. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>> > On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vi...@zalando.de> >>>>>> wrote: >>>>>> > >>>>>> > Yes, with the "start-cluster-streaming.sh" script. >>>>>> > If the TaskManager gets 5GB of heap it manages to process ~100 >>>>>> million messages and then throws the above OOM. >>>>>> > If it gets only 500MB it manages to process ~8 million and a >>>>>> somewhat misleading exception is thrown: >>>>>> > >>>>>> > 12/01/2015 19:14:07 Source: Custom Source -> Map -> Map(1/1) >>>>>> switched to FAILED >>>>>> > java.lang.Exception: Java heap space >>>>>> > at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>>>> > at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >>>>>> > at >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>> > at java.lang.Thread.run(Thread.java:745) >>>>>> > Caused by: java.lang.OutOfMemoryError: Java heap space >>>>>> > at org.json.simple.parser.Yylex.<init>(Yylex.java:231) >>>>>> > at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34) >>>>>> > at >>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70) >>>>>> > at >>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65) >>>>>> > at >>>>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97) >>>>>> > at >>>>>> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92) >>>>>> > at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450) >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetz...@apache.org>: >>>>>> > Its good news that the issue has been resolved. >>>>>> > >>>>>> > Regarding the OOM, did you start Flink in the streaming mode? >>>>>> > >>>>>> > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail < >>>>>> mihail.vi...@zalando.de> wrote: >>>>>> > Thank you, Robert! The issue with Kafka is now solved with the >>>>>> 0.10-SNAPSHOT dependency. >>>>>> > >>>>>> > We have run into an OutOfMemory exception though, which appears to >>>>>> be related to the state. As my colleague, Javier Lopez, mentioned in a >>>>>> previous thread, state handling is crucial for our use case. And as the >>>>>> jobs are intended to run for months, stability plays an important role in >>>>>> choosing a stream processing framework. >>>>>> > >>>>>> > 12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at >>>>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>>>> FAILED >>>>>> > java.lang.OutOfMemoryError: Java heap space >>>>>> > at java.util.HashMap.resize(HashMap.java:703) >>>>>> > at java.util.HashMap.putVal(HashMap.java:662) >>>>>> > at java.util.HashMap.put(HashMap.java:611) >>>>>> > at >>>>>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) >>>>>> > at >>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121) >>>>>> > at >>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) >>>>>> > at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>> > at java.lang.Thread.run(Thread.java:745) >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>: >>>>>> > Thanks! I've linked the issue in JIRA. >>>>>> > >>>>>> > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org> >>>>>> wrote: >>>>>> > > I think its this one >>>>>> https://issues.apache.org/jira/browse/KAFKA-824 >>>>>> > > >>>>>> > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels < >>>>>> m...@apache.org> wrote: >>>>>> > >> >>>>>> > >> I know this has been fixed already but, out of curiosity, could >>>>>> you >>>>>> > >> point me to the Kafka JIRA issue for this >>>>>> > >> bug? From the Flink issue it looks like this is a Zookeeper >>>>>> version >>>>>> > >> mismatch. >>>>>> > >> >>>>>> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger < >>>>>> rmetz...@apache.org> >>>>>> > >> wrote: >>>>>> > >> > Hi Gyula, >>>>>> > >> > >>>>>> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the >>>>>> > >> > "release-0.10" branch to Apache's maven snapshot repository. >>>>>> > >> > >>>>>> > >> > >>>>>> > >> > I don't think Mihail's code will run when he's compiling it >>>>>> against >>>>>> > >> > 1.0-SNAPSHOT. >>>>>> > >> > >>>>>> > >> > >>>>>> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra < >>>>>> gyula.f...@gmail.com> wrote: >>>>>> > >> >> >>>>>> > >> >> Hi, >>>>>> > >> >> >>>>>> > >> >> I think Robert meant to write setting the connector >>>>>> dependency to >>>>>> > >> >> 1.0-SNAPSHOT. >>>>>> > >> >> >>>>>> > >> >> Cheers, >>>>>> > >> >> Gyula >>>>>> > >> >> >>>>>> > >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: >>>>>> 2015. dec. 1., >>>>>> > >> >> K, >>>>>> > >> >> 17:10): >>>>>> > >> >>> >>>>>> > >> >>> Hi Mihail, >>>>>> > >> >>> >>>>>> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in >>>>>> Flink for this >>>>>> > >> >>> as >>>>>> > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >>>>>> > >> >>> >>>>>> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 >>>>>> will contain >>>>>> > >> >>> a >>>>>> > >> >>> fix. >>>>>> > >> >>> >>>>>> > >> >>> Since the kafka connector is not contained in the flink >>>>>> binary, you >>>>>> > >> >>> can >>>>>> > >> >>> just set the version in your maven pom file to >>>>>> 0.10-SNAPSHOT. Maven >>>>>> > >> >>> will >>>>>> > >> >>> then download the code planned for the 0.10-SNAPSHOT release. >>>>>> > >> >>> >>>>>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail >>>>>> > >> >>> <mihail.vi...@zalando.de> >>>>>> > >> >>> wrote: >>>>>> > >> >>>> >>>>>> > >> >>>> Hi, >>>>>> > >> >>>> >>>>>> > >> >>>> we get the following NullPointerException after ~50 minutes >>>>>> when >>>>>> > >> >>>> running >>>>>> > >> >>>> a streaming job with windowing and state that reads data >>>>>> from Kafka >>>>>> > >> >>>> and >>>>>> > >> >>>> writes the result to local FS. >>>>>> > >> >>>> There are around 170 million messages to be processed, >>>>>> Flink 0.10.1 >>>>>> > >> >>>> stops at ~8 million. >>>>>> > >> >>>> Flink runs locally, started with the >>>>>> "start-cluster-streaming.sh" >>>>>> > >> >>>> script. >>>>>> > >> >>>> >>>>>> > >> >>>> 12/01/2015 15:06:24 Job execution switched to status >>>>>> RUNNING. >>>>>> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> >>>>>> Map(1/1) >>>>>> > >> >>>> switched >>>>>> > >> >>>> to SCHEDULED >>>>>> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> >>>>>> Map(1/1) >>>>>> > >> >>>> switched >>>>>> > >> >>>> to DEPLOYING >>>>>> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of >>>>>> Reduce at >>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>>> switched to >>>>>> > >> >>>> SCHEDULED >>>>>> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of >>>>>> Reduce at >>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>>> switched to >>>>>> > >> >>>> DEPLOYING >>>>>> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> >>>>>> Map(1/1) >>>>>> > >> >>>> switched >>>>>> > >> >>>> to RUNNING >>>>>> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of >>>>>> Reduce at >>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>>> switched to >>>>>> > >> >>>> RUNNING >>>>>> > >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of >>>>>> Reduce at >>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>>> switched to >>>>>> > >> >>>> CANCELED >>>>>> > >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> >>>>>> Map(1/1) >>>>>> > >> >>>> switched >>>>>> > >> >>>> to FAILED >>>>>> > >> >>>> java.lang.Exception >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>>> > >> >>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>> > >> >>>> at java.lang.Thread.run(Thread.java:745) >>>>>> > >> >>>> Caused by: java.lang.NullPointerException >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >>>>>> > >> >>>> at >>>>>> org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >>>>>> > >> >>>> at >>>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >>>>>> > >> >>>> at >>>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >>>>>> > >> >>>> at >>>>>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >>>>>> > >> >>>> at >>>>>> kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >>>>>> > >> >>>> at >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >>>>>> > >> >>>> >>>>>> > >> >>>> >>>>>> > >> >>>> Any ideas on what could cause this behaviour? >>>>>> > >> >>>> >>>>>> > >> >>>> Best, >>>>>> > >> >>>> Mihail >>>>>> > >> >>> >>>>>> > >> >>> >>>>>> > >> > >>>>>> > > >>>>>> > > >>>>>> > >>>>>> > >>>>>> > >>>>>> >>>>>> >>>>> >>> >> >