Hi Fabian, https://ci.apache.org/projects/flink/flink-docs-master/dev/state_backends.html#the-rocksdbstatebackend
I am referring to this, this does not clearly state if the state will be maintained in local disk even after checkpointing. Or I am not getting it correclty :) Regards, Vinay Patil On Thu, Sep 1, 2016 at 1:38 PM, Fabian Hueske-2 [via Apache Flink User Mailing List archive.] <ml-node+s2336050n884...@n4.nabble.com> wrote: > You do not have to convert your DTO into a JSON object to use it as a > key-value state in a Flink function. > You can pass it as it is via the state interfaces. > > Can you point me to the documentation that you find confusing? The state > documentation [1] says: > > >> You can make *every* transformation (map, filter, etc) stateful by > using Flink’s state interface or checkpointing instance fields of your > function. > >> You can register any instance field as *managed* state by implementing > an interface. > >> In this case, and also in the case of using Flink’s native state > interface, Flink will automatically take consistent snapshots of your state > periodically, and restore its value in the case of a failure. > > Is that unclear/confusing or are you referring to different paragraph? > > Thanks, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/state.html > > 2016-09-01 20:22 GMT+02:00 vinay patil <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=8847&i=0>>: > >> I don't to join the third stream. >> >> And Yes, This is what I was thinking of.also : >> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// >> backup join) >> >> >> I am already done integrating with Cassandra but I feel RocksDB will be a >> better option, I will have to take care of the clearing part as you have >> suggested, will check that in documentation. >> >> I have the DTO with almost 50 fields , converting it to JSON and storing >> it as a state should not be a problem , or there is no harm in storing the >> DTO ? >> >> I think the documentation should specify the point that the state will be >> maintained for user-defined operators to avoid confusion. >> >> Regards, >> Vinay Patil >> >> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User >> Mailing List archive.] <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote: >> >>> I thought you would like to join the non-matched elements with another >>> (third) stream. >>> >>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect( >>> s3.keyBy).coFlatMap(// backup join) >>> >>> If you want to match the non-matched stream with itself a >>> FlatMapFunction is the right choice. >>> >>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// >>> backup join) >>> >>> The backup join puts all non-match elements in the state and waits for >>> another non-matched element with the same key to do the join. >>> >>> Best, Fabian >>> >>> >>> >>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>: >>> >>>> Yes, that's what I am looking for. >>>> >>>> But why to use CoFlatMapFunction , I have already got the >>>> matchingAndNonMatching Stream , by doing the union of two streams and >>>> having the logic in apply method for performing outer-join. >>>> >>>> I am thinking of applying the same key on matchingAndNonMatching and >>>> flatmap to take care of rest logic. >>>> >>>> Or are you suggestion to use Co-FlatMapFunction after the outer-join >>>> operation (I mean after doing the window and >>>> getting matchingAndNonMatching stream )? >>>> >>>> Regards, >>>> Vinay Patil >>>> >>>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User >>>> Mailing List archive.] <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote: >>>> >>>>> Thanks for the explanation. I think I understood your usecase. >>>>> >>>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed >>>>> stream (keyed by join key). >>>>> One input would be the unmatched outer join records, the other input >>>>> would serve the events you want to match them with. >>>>> Retrieving elements from RocksDB will be local and should be fast. >>>>> >>>>> You should be confident though, that all unmatched record will be >>>>> picked up at some point (RocksDB persists to disk, so you won't run out of >>>>> memory but snapshots size will increase). >>>>> The future state expiry feature will avoid such situations. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> I had already used Co-Group function earlier but were getting some >>>>>> issues while dealing with watermarks (for one use case I was not getting >>>>>> the correct result), so I have used the union operator for performing the >>>>>> outer-join (WindowFunction on a keyedStream), this approach is working >>>>>> correctly and giving me correct results. >>>>>> >>>>>> As I have discussed the scenario, I want to maintain the non-matching >>>>>> records in some store, so that's why I was thinking of using RocksDB as a >>>>>> store here, where I will maintain the user-defined state after the >>>>>> outer-join window operator, and I can query it using Flink to check if >>>>>> the >>>>>> value for a particular key is present or not , if present I can match >>>>>> them >>>>>> and send it downstream. >>>>>> >>>>>> The final goal is to have zero non-matching records, so this is the >>>>>> backup plan to handle edge-case scenarios. >>>>>> >>>>>> I have already integrated code to write to Cassandra using Flink >>>>>> Connector, but I think this will be a better option rather than hitting >>>>>> the >>>>>> query to external store since RocksDb will store the data to local TM >>>>>> disk, >>>>>> the retrieval will be faster here than Cassandra , right ? >>>>>> >>>>>> What do you think ? >>>>>> >>>>>> >>>>>> Regards, >>>>>> Vinay Patil >>>>>> >>>>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink >>>>>> User Mailing List archive.] <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote: >>>>>> >>>>>>> Hi Vinay, >>>>>>> >>>>>>> can you give a bit more detail about how you plan to implement the >>>>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a >>>>>>> KeyedStream? >>>>>>> >>>>>>> An alternative could be to use a CoGroup operator which collects >>>>>>> from two inputs all elements that share a common key (the join key) and >>>>>>> are >>>>>>> in the same window. The interface of the function provides two iterators >>>>>>> over the elements of both inputs and can be used to implement outer join >>>>>>> functionality. The benefit of working with a CoGroupFunction is that >>>>>>> you do >>>>>>> not have to take care of state handling at all. >>>>>>> >>>>>>> In case you go for a custom implementation you will need to work >>>>>>> with operator state. >>>>>>> However, you do not need to directly interact with RocksDB. Flink is >>>>>>> taking care of that for you. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>: >>>>>>> >>>>>>>> Hi Fabian/Stephan, >>>>>>>> >>>>>>>> Waiting for your suggestion >>>>>>>> >>>>>>>> Regards, >>>>>>>> Vinay Patil >>>>>>>> >>>>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email] >>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote: >>>>>>>> >>>>>>>>> Hi Fabian/Stephan, >>>>>>>>> >>>>>>>>> This makes things clear. >>>>>>>>> >>>>>>>>> This is the use case I have : >>>>>>>>> I am performing a outer join operation on the two streams (in >>>>>>>>> window) after which I get matchingAndNonMatchingStream, now I want to >>>>>>>>> make >>>>>>>>> sure that the matching rate is high (matching cannot happen if one of >>>>>>>>> the >>>>>>>>> source is not emitting elements for certain time) , so to tackle this >>>>>>>>> situation I was thinking of using RocksDB as a state Backend, where I >>>>>>>>> will >>>>>>>>> insert the unmatched records in it (key - will be same as used for >>>>>>>>> window >>>>>>>>> and value will be DTO ), so before inserting into it I will check if >>>>>>>>> it is >>>>>>>>> already present in RocksDB, if yes I will take the data from it and >>>>>>>>> send it >>>>>>>>> downstream (and ensure I perform the clean operation for that key). >>>>>>>>> (Also the data to store should be encrypted, encryption part can >>>>>>>>> be handled ) >>>>>>>>> >>>>>>>>> so instead of using Cassandra , Can I do this using RocksDB as >>>>>>>>> state backend since the state is not gone after checkpointing ? >>>>>>>>> >>>>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe >>>>>>>>> on handling late elements but to tackle edge case scenarios like the >>>>>>>>> one >>>>>>>>> mentioned above we are having a backup plan of using Cassandra as >>>>>>>>> external >>>>>>>>> store since we are dealing with financial critical data. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Vinay Patil >>>>>>>>> >>>>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email] >>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote: >>>>>>>>> >>>>>>>>>> Hi Vinaj, >>>>>>>>>> >>>>>>>>>> if you use user-defined state, you have to manually clear it. >>>>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB) >>>>>>>>>> until the >>>>>>>>>> job goes down (planned or due to an OOM error). >>>>>>>>>> >>>>>>>>>> This is esp. important to keep in mind, when using keyed state. >>>>>>>>>> If you have an unbounded, evolving key space you will likely run >>>>>>>>>> out-of-memory. >>>>>>>>>> The job will constantly add state for each new key but won't be >>>>>>>>>> able to >>>>>>>>>> clean up the state for "expired" keys. >>>>>>>>>> >>>>>>>>>> You could implement a clean-up mechanism this if you implement a >>>>>>>>>> custom >>>>>>>>>> stream operator. >>>>>>>>>> However this is a very low level interface and requires solid >>>>>>>>>> understanding >>>>>>>>>> of the internals like timestamps, watermarks and the checkpointing >>>>>>>>>> mechanism. >>>>>>>>>> >>>>>>>>>> The community is currently working on a state expiry feature >>>>>>>>>> (state will be >>>>>>>>>> discarded if not requested or updated for x minutes). >>>>>>>>>> >>>>>>>>>> Regarding the second question: Does state remain local after >>>>>>>>>> checkpointing? >>>>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, >>>>>>>>>> ...) but >>>>>>>>>> remains in the operator. So the state is not gone after a >>>>>>>>>> checkpoint is >>>>>>>>>> completed. >>>>>>>>>> >>>>>>>>>> Hope this helps, >>>>>>>>>> Fabian >>>>>>>>>> >>>>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email] >>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>: >>>>>>>>>> >>>>>>>>>> > Hi Stephan, >>>>>>>>>> > >>>>>>>>>> > Just wanted to jump into this discussion regarding state. >>>>>>>>>> > >>>>>>>>>> > So do you mean that if we maintain user-defined state (for >>>>>>>>>> non-window >>>>>>>>>> > operators), then if we do not clear it explicitly will the >>>>>>>>>> data for that >>>>>>>>>> > key remains in RocksDB. >>>>>>>>>> > >>>>>>>>>> > What happens in case of checkpoint ? I read in the >>>>>>>>>> documentation that after >>>>>>>>>> > the checkpoint happens the rocksDB data is pushed to the >>>>>>>>>> desired location >>>>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the >>>>>>>>>> data still >>>>>>>>>> > remain in RocksDB after checkpoint ? >>>>>>>>>> > >>>>>>>>>> > Correct me if I have misunderstood this concept >>>>>>>>>> > >>>>>>>>>> > For one of our use we were going for this, but since I read the >>>>>>>>>> above part >>>>>>>>>> > in documentation so we are going for Cassandra now (to store >>>>>>>>>> records and >>>>>>>>>> > query them for a special case) >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > Regards, >>>>>>>>>> > Vinay Patil >>>>>>>>>> > >>>>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email] >>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote: >>>>>>>>>> > >>>>>>>>>> > > In streaming, memory is mainly needed for state (key/value >>>>>>>>>> state). The >>>>>>>>>> > > exact representation depends on the chosen StateBackend. >>>>>>>>>> > > >>>>>>>>>> > > State is explicitly released: For windows, state is cleaned up >>>>>>>>>> > > automatically (firing / expiry), for user-defined state, keys >>>>>>>>>> have to be >>>>>>>>>> > > explicitly cleared (clear() method) or in the future will >>>>>>>>>> have the option >>>>>>>>>> > > to expire. >>>>>>>>>> > > >>>>>>>>>> > > The heavy work horse for streaming state is currently >>>>>>>>>> RocksDB, which >>>>>>>>>> > > internally uses native (off-heap) memory to keep the data. >>>>>>>>>> > > >>>>>>>>>> > > Does that help? >>>>>>>>>> > > >>>>>>>>>> > > Stephan >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email] >>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>> >>>>>>>>>> > > wrote: >>>>>>>>>> > > >>>>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation >>>>>>>>>> is avoided by >>>>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe >>>>>>>>>> methods. >>>>>>>>>> > > > >>>>>>>>>> > > > Couldn't find any docs describing mem mgmt in Streamingn >>>>>>>>>> mode. So... >>>>>>>>>> > > > >>>>>>>>>> > > > - Am wondering if this is also the case with Streaming ? >>>>>>>>>> > > > >>>>>>>>>> > > > - If so, how does Flink detect that an object is no longer >>>>>>>>>> being used >>>>>>>>>> > and >>>>>>>>>> > > > can be reclaimed for reuse once again ? >>>>>>>>>> > > > >>>>>>>>>> > > > -roshan >>>>>>>>>> > > > >>>>>>>>>> > > >>>>>>>>>> > >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> ------------------------------ >>>>>>>> View this message in context: Re: Streaming - memory management >>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html> >>>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>>>> archive >>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>>>>> at Nabble.com. >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ------------------------------ >>>>>>> If you reply to this email, your message will be added to the >>>>>>> discussion below: >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html >>>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>>> email [hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1> >>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>>> here. >>>>>>> NAML >>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> View this message in context: Re: Streaming - memory management >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html> >>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>> archive >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>>> at Nabble.com. >>>>>> >>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> If you reply to this email, your message will be added to the >>>>> discussion below: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>> ble.com/Re-Streaming-memory-management-tp8829p8837.html >>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>> email [hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=1> >>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>> here. >>>>> NAML >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>> >>>> >>>> >>>> ------------------------------ >>>> View this message in context: Re: Streaming - memory management >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html> >>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>> archive >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>> at Nabble.com. >>>> >>> >>> >>> >>> ------------------------------ >>> If you reply to this email, your message will be added to the discussion >>> below: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>> ble.com/Re-Streaming-memory-management-tp8829p8843.html >>> To start a new topic under Apache Flink User Mailing List archive., >>> email [hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=8845&i=1> >>> To unsubscribe from Apache Flink User Mailing List archive., click here. >>> NAML >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>> >> >> >> ------------------------------ >> View this message in context: Re: Streaming - memory management >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8845.html> >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >> at Nabble.com. >> > > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/Re-Streaming-memory-management-tp8829p8847.html > To start a new topic under Apache Flink User Mailing List archive., email > ml-node+s2336050n1...@n4.nabble.com > To unsubscribe from Apache Flink User Mailing List archive., click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8849.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.