Hi Fabian, I had already used Co-Group function earlier but were getting some issues while dealing with watermarks (for one use case I was not getting the correct result), so I have used the union operator for performing the outer-join (WindowFunction on a keyedStream), this approach is working correctly and giving me correct results.
As I have discussed the scenario, I want to maintain the non-matching records in some store, so that's why I was thinking of using RocksDB as a store here, where I will maintain the user-defined state after the outer-join window operator, and I can query it using Flink to check if the value for a particular key is present or not , if present I can match them and send it downstream. The final goal is to have zero non-matching records, so this is the backup plan to handle edge-case scenarios. I have already integrated code to write to Cassandra using Flink Connector, but I think this will be a better option rather than hitting the query to external store since RocksDb will store the data to local TM disk, the retrieval will be faster here than Cassandra , right ? What do you think ? Regards, Vinay Patil On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <ml-node+s2336050n8832...@n4.nabble.com> wrote: > Hi Vinay, > > can you give a bit more detail about how you plan to implement the outer > join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream? > > An alternative could be to use a CoGroup operator which collects from two > inputs all elements that share a common key (the join key) and are in the > same window. The interface of the function provides two iterators over the > elements of both inputs and can be used to implement outer join > functionality. The benefit of working with a CoGroupFunction is that you do > not have to take care of state handling at all. > > In case you go for a custom implementation you will need to work with > operator state. > However, you do not need to directly interact with RocksDB. Flink is > taking care of that for you. > > Best, Fabian > > 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>: > >> Hi Fabian/Stephan, >> >> Waiting for your suggestion >> >> Regards, >> Vinay Patil >> >> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote: >> >>> Hi Fabian/Stephan, >>> >>> This makes things clear. >>> >>> This is the use case I have : >>> I am performing a outer join operation on the two streams (in window) >>> after which I get matchingAndNonMatchingStream, now I want to make sure >>> that the matching rate is high (matching cannot happen if one of the source >>> is not emitting elements for certain time) , so to tackle this situation I >>> was thinking of using RocksDB as a state Backend, where I will insert the >>> unmatched records in it (key - will be same as used for window and value >>> will be DTO ), so before inserting into it I will check if it is already >>> present in RocksDB, if yes I will take the data from it and send it >>> downstream (and ensure I perform the clean operation for that key). >>> (Also the data to store should be encrypted, encryption part can be >>> handled ) >>> >>> so instead of using Cassandra , Can I do this using RocksDB as state >>> backend since the state is not gone after checkpointing ? >>> >>> P.S I have kept the watermark behind by 1500 secs just to be safe on >>> handling late elements but to tackle edge case scenarios like the one >>> mentioned above we are having a backup plan of using Cassandra as external >>> store since we are dealing with financial critical data. >>> >>> Regards, >>> Vinay Patil >>> >>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote: >>> >>>> Hi Vinaj, >>>> >>>> if you use user-defined state, you have to manually clear it. >>>> Otherwise, it will stay in the state backend (heap or RocksDB) until the >>>> job goes down (planned or due to an OOM error). >>>> >>>> This is esp. important to keep in mind, when using keyed state. >>>> If you have an unbounded, evolving key space you will likely run >>>> out-of-memory. >>>> The job will constantly add state for each new key but won't be able to >>>> clean up the state for "expired" keys. >>>> >>>> You could implement a clean-up mechanism this if you implement a custom >>>> stream operator. >>>> However this is a very low level interface and requires solid >>>> understanding >>>> of the internals like timestamps, watermarks and the checkpointing >>>> mechanism. >>>> >>>> The community is currently working on a state expiry feature (state >>>> will be >>>> discarded if not requested or updated for x minutes). >>>> >>>> Regarding the second question: Does state remain local after >>>> checkpointing? >>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but >>>> remains in the operator. So the state is not gone after a checkpoint is >>>> completed. >>>> >>>> Hope this helps, >>>> Fabian >>>> >>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>: >>>> >>>> > Hi Stephan, >>>> > >>>> > Just wanted to jump into this discussion regarding state. >>>> > >>>> > So do you mean that if we maintain user-defined state (for non-window >>>> > operators), then if we do not clear it explicitly will the data for >>>> that >>>> > key remains in RocksDB. >>>> > >>>> > What happens in case of checkpoint ? I read in the documentation that >>>> after >>>> > the checkpoint happens the rocksDB data is pushed to the desired >>>> location >>>> > (hdfs or s3 or other fs), so for user-defined state does the data >>>> still >>>> > remain in RocksDB after checkpoint ? >>>> > >>>> > Correct me if I have misunderstood this concept >>>> > >>>> > For one of our use we were going for this, but since I read the above >>>> part >>>> > in documentation so we are going for Cassandra now (to store records >>>> and >>>> > query them for a special case) >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > Regards, >>>> > Vinay Patil >>>> > >>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote: >>>> > >>>> > > In streaming, memory is mainly needed for state (key/value state). >>>> The >>>> > > exact representation depends on the chosen StateBackend. >>>> > > >>>> > > State is explicitly released: For windows, state is cleaned up >>>> > > automatically (firing / expiry), for user-defined state, keys have >>>> to be >>>> > > explicitly cleared (clear() method) or in the future will have the >>>> option >>>> > > to expire. >>>> > > >>>> > > The heavy work horse for streaming state is currently RocksDB, which >>>> > > internally uses native (off-heap) memory to keep the data. >>>> > > >>>> > > Does that help? >>>> > > >>>> > > Stephan >>>> > > >>>> > > >>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>> >>>> > > wrote: >>>> > > >>>> > > > As per the docs, in Batch mode, dynamic memory allocation is >>>> avoided by >>>> > > > storing messages being processed in ByteBuffers via Unsafe >>>> methods. >>>> > > > >>>> > > > Couldn't find any docs describing mem mgmt in Streamingn mode. >>>> So... >>>> > > > >>>> > > > - Am wondering if this is also the case with Streaming ? >>>> > > > >>>> > > > - If so, how does Flink detect that an object is no longer being >>>> used >>>> > and >>>> > > > can be reclaimed for reuse once again ? >>>> > > > >>>> > > > -roshan >>>> > > > >>>> > > >>>> > >>>> >>> >>> >> >> ------------------------------ >> View this message in context: Re: Streaming - memory management >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html> >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >> at Nabble.com. >> > > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/Re-Streaming-memory-management-tp8829p8832.html > To start a new topic under Apache Flink User Mailing List archive., email > ml-node+s2336050n1...@n4.nabble.com > To unsubscribe from Apache Flink User Mailing List archive., click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.