I thought you would like to join the non-matched elements with another (third) stream.
--> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(s3.keyBy).coFlatMap(// backup join) If you want to match the non-matched stream with itself a FlatMapFunction is the right choice. --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// backup join) The backup join puts all non-match elements in the state and waits for another non-matched element with the same key to do the join. Best, Fabian 2016-09-01 19:55 GMT+02:00 vinay patil <vinay18.pa...@gmail.com>: > Yes, that's what I am looking for. > > But why to use CoFlatMapFunction , I have already got the > matchingAndNonMatching Stream , by doing the union of two streams and > having the logic in apply method for performing outer-join. > > I am thinking of applying the same key on matchingAndNonMatching and > flatmap to take care of rest logic. > > Or are you suggestion to use Co-FlatMapFunction after the outer-join > operation (I mean after doing the window and > getting matchingAndNonMatching stream )? > > Regards, > Vinay Patil > > On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User > Mailing List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote: > >> Thanks for the explanation. I think I understood your usecase. >> >> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed >> stream (keyed by join key). >> One input would be the unmatched outer join records, the other input >> would serve the events you want to match them with. >> Retrieving elements from RocksDB will be local and should be fast. >> >> You should be confident though, that all unmatched record will be picked >> up at some point (RocksDB persists to disk, so you won't run out of memory >> but snapshots size will increase). >> The future state expiry feature will avoid such situations. >> >> Best, Fabian >> >> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>: >> >>> Hi Fabian, >>> >>> I had already used Co-Group function earlier but were getting some >>> issues while dealing with watermarks (for one use case I was not getting >>> the correct result), so I have used the union operator for performing the >>> outer-join (WindowFunction on a keyedStream), this approach is working >>> correctly and giving me correct results. >>> >>> As I have discussed the scenario, I want to maintain the non-matching >>> records in some store, so that's why I was thinking of using RocksDB as a >>> store here, where I will maintain the user-defined state after the >>> outer-join window operator, and I can query it using Flink to check if the >>> value for a particular key is present or not , if present I can match them >>> and send it downstream. >>> >>> The final goal is to have zero non-matching records, so this is the >>> backup plan to handle edge-case scenarios. >>> >>> I have already integrated code to write to Cassandra using Flink >>> Connector, but I think this will be a better option rather than hitting the >>> query to external store since RocksDb will store the data to local TM disk, >>> the retrieval will be faster here than Cassandra , right ? >>> >>> What do you think ? >>> >>> >>> Regards, >>> Vinay Patil >>> >>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User >>> Mailing List archive.] <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote: >>> >>>> Hi Vinay, >>>> >>>> can you give a bit more detail about how you plan to implement the >>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream? >>>> >>>> An alternative could be to use a CoGroup operator which collects from >>>> two inputs all elements that share a common key (the join key) and are in >>>> the same window. The interface of the function provides two iterators over >>>> the elements of both inputs and can be used to implement outer join >>>> functionality. The benefit of working with a CoGroupFunction is that you do >>>> not have to take care of state handling at all. >>>> >>>> In case you go for a custom implementation you will need to work with >>>> operator state. >>>> However, you do not need to directly interact with RocksDB. Flink is >>>> taking care of that for you. >>>> >>>> Best, Fabian >>>> >>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>: >>>> >>>>> Hi Fabian/Stephan, >>>>> >>>>> Waiting for your suggestion >>>>> >>>>> Regards, >>>>> Vinay Patil >>>>> >>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote: >>>>> >>>>>> Hi Fabian/Stephan, >>>>>> >>>>>> This makes things clear. >>>>>> >>>>>> This is the use case I have : >>>>>> I am performing a outer join operation on the two streams (in window) >>>>>> after which I get matchingAndNonMatchingStream, now I want to make sure >>>>>> that the matching rate is high (matching cannot happen if one of the >>>>>> source >>>>>> is not emitting elements for certain time) , so to tackle this situation >>>>>> I >>>>>> was thinking of using RocksDB as a state Backend, where I will insert the >>>>>> unmatched records in it (key - will be same as used for window and value >>>>>> will be DTO ), so before inserting into it I will check if it is already >>>>>> present in RocksDB, if yes I will take the data from it and send it >>>>>> downstream (and ensure I perform the clean operation for that key). >>>>>> (Also the data to store should be encrypted, encryption part can be >>>>>> handled ) >>>>>> >>>>>> so instead of using Cassandra , Can I do this using RocksDB as state >>>>>> backend since the state is not gone after checkpointing ? >>>>>> >>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe on >>>>>> handling late elements but to tackle edge case scenarios like the one >>>>>> mentioned above we are having a backup plan of using Cassandra as >>>>>> external >>>>>> store since we are dealing with financial critical data. >>>>>> >>>>>> Regards, >>>>>> Vinay Patil >>>>>> >>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote: >>>>>> >>>>>>> Hi Vinaj, >>>>>>> >>>>>>> if you use user-defined state, you have to manually clear it. >>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB) until >>>>>>> the >>>>>>> job goes down (planned or due to an OOM error). >>>>>>> >>>>>>> This is esp. important to keep in mind, when using keyed state. >>>>>>> If you have an unbounded, evolving key space you will likely run >>>>>>> out-of-memory. >>>>>>> The job will constantly add state for each new key but won't be able >>>>>>> to >>>>>>> clean up the state for "expired" keys. >>>>>>> >>>>>>> You could implement a clean-up mechanism this if you implement a >>>>>>> custom >>>>>>> stream operator. >>>>>>> However this is a very low level interface and requires solid >>>>>>> understanding >>>>>>> of the internals like timestamps, watermarks and the checkpointing >>>>>>> mechanism. >>>>>>> >>>>>>> The community is currently working on a state expiry feature (state >>>>>>> will be >>>>>>> discarded if not requested or updated for x minutes). >>>>>>> >>>>>>> Regarding the second question: Does state remain local after >>>>>>> checkpointing? >>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) >>>>>>> but >>>>>>> remains in the operator. So the state is not gone after a checkpoint >>>>>>> is >>>>>>> completed. >>>>>>> >>>>>>> Hope this helps, >>>>>>> Fabian >>>>>>> >>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>: >>>>>>> >>>>>>> > Hi Stephan, >>>>>>> > >>>>>>> > Just wanted to jump into this discussion regarding state. >>>>>>> > >>>>>>> > So do you mean that if we maintain user-defined state (for >>>>>>> non-window >>>>>>> > operators), then if we do not clear it explicitly will the data >>>>>>> for that >>>>>>> > key remains in RocksDB. >>>>>>> > >>>>>>> > What happens in case of checkpoint ? I read in the documentation >>>>>>> that after >>>>>>> > the checkpoint happens the rocksDB data is pushed to the desired >>>>>>> location >>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the data >>>>>>> still >>>>>>> > remain in RocksDB after checkpoint ? >>>>>>> > >>>>>>> > Correct me if I have misunderstood this concept >>>>>>> > >>>>>>> > For one of our use we were going for this, but since I read the >>>>>>> above part >>>>>>> > in documentation so we are going for Cassandra now (to store >>>>>>> records and >>>>>>> > query them for a special case) >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > Regards, >>>>>>> > Vinay Patil >>>>>>> > >>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote: >>>>>>> > >>>>>>> > > In streaming, memory is mainly needed for state (key/value >>>>>>> state). The >>>>>>> > > exact representation depends on the chosen StateBackend. >>>>>>> > > >>>>>>> > > State is explicitly released: For windows, state is cleaned up >>>>>>> > > automatically (firing / expiry), for user-defined state, keys >>>>>>> have to be >>>>>>> > > explicitly cleared (clear() method) or in the future will have >>>>>>> the option >>>>>>> > > to expire. >>>>>>> > > >>>>>>> > > The heavy work horse for streaming state is currently RocksDB, >>>>>>> which >>>>>>> > > internally uses native (off-heap) memory to keep the data. >>>>>>> > > >>>>>>> > > Does that help? >>>>>>> > > >>>>>>> > > Stephan >>>>>>> > > >>>>>>> > > >>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>> >>>>>>> > > wrote: >>>>>>> > > >>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is >>>>>>> avoided by >>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe >>>>>>> methods. >>>>>>> > > > >>>>>>> > > > Couldn't find any docs describing mem mgmt in Streamingn >>>>>>> mode. So... >>>>>>> > > > >>>>>>> > > > - Am wondering if this is also the case with Streaming ? >>>>>>> > > > >>>>>>> > > > - If so, how does Flink detect that an object is no longer >>>>>>> being used >>>>>>> > and >>>>>>> > > > can be reclaimed for reuse once again ? >>>>>>> > > > >>>>>>> > > > -roshan >>>>>>> > > > >>>>>>> > > >>>>>>> > >>>>>>> >>>>>> >>>>>> >>>>> >>>>> ------------------------------ >>>>> View this message in context: Re: Streaming - memory management >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html> >>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>> archive >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>> at Nabble.com. >>>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> If you reply to this email, your message will be added to the >>>> discussion below: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html >>>> To start a new topic under Apache Flink User Mailing List archive., >>>> email [hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1> >>>> To unsubscribe from Apache Flink User Mailing List archive., click here >>>> . >>>> NAML >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>> >>> >>> ------------------------------ >>> View this message in context: Re: Streaming - memory management >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html> >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>> at Nabble.com. >>> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Re-Streaming-memory-management-tp8829p8837.html >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] <http:///user/SendEmail.jtp?type=node&node=8842&i=1> >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Streaming - memory management > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >