Thanks for the pointer and sorry for the late answer. I guess that depends on the semantics of "checkpointing". In Flink's terminology this means creating a copy of the state (and writing the copy to the external FS). It does not mean that the state is migrated or moved to the external FS.
Best, Fabian 2016-09-01 20:53 GMT+02:00 vinay patil <vinay18.pa...@gmail.com>: > Hi Fabian, > > https://ci.apache.org/projects/flink/flink-docs-master/dev/state_backends. > html#the-rocksdbstatebackend > > I am referring to this, this does not clearly state if the state will be > maintained in local disk even after checkpointing. > > Or I am not getting it correclty :) > > Regards, > Vinay Patil > > On Thu, Sep 1, 2016 at 1:38 PM, Fabian Hueske-2 [via Apache Flink User > Mailing List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=8849&i=0>> wrote: > >> You do not have to convert your DTO into a JSON object to use it as a >> key-value state in a Flink function. >> You can pass it as it is via the state interfaces. >> >> Can you point me to the documentation that you find confusing? The state >> documentation [1] says: >> >> >> You can make *every* transformation (map, filter, etc) stateful by >> using Flink’s state interface or checkpointing instance fields of your >> function. >> >> You can register any instance field as *managed* state by >> implementing an interface. >> >> In this case, and also in the case of using Flink’s native state >> interface, Flink will automatically take consistent snapshots of your state >> periodically, and restore its value in the case of a failure. >> >> Is that unclear/confusing or are you referring to different paragraph? >> >> Thanks, Fabian >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >> apis/streaming/state.html >> >> 2016-09-01 20:22 GMT+02:00 vinay patil <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=8847&i=0>>: >> >>> I don't to join the third stream. >>> >>> And Yes, This is what I was thinking of.also : >>> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// >>> backup join) >>> >>> >>> I am already done integrating with Cassandra but I feel RocksDB will be >>> a better option, I will have to take care of the clearing part as you have >>> suggested, will check that in documentation. >>> >>> I have the DTO with almost 50 fields , converting it to JSON and storing >>> it as a state should not be a problem , or there is no harm in storing the >>> DTO ? >>> >>> I think the documentation should specify the point that the state will >>> be maintained for user-defined operators to avoid confusion. >>> >>> Regards, >>> Vinay Patil >>> >>> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User >>> Mailing List archive.] <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote: >>> >>>> I thought you would like to join the non-matched elements with another >>>> (third) stream. >>>> >>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect( >>>> s3.keyBy).coFlatMap(// backup join) >>>> >>>> If you want to match the non-matched stream with itself a >>>> FlatMapFunction is the right choice. >>>> >>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// >>>> backup join) >>>> >>>> The backup join puts all non-match elements in the state and waits for >>>> another non-matched element with the same key to do the join. >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>: >>>> >>>>> Yes, that's what I am looking for. >>>>> >>>>> But why to use CoFlatMapFunction , I have already got the >>>>> matchingAndNonMatching Stream , by doing the union of two streams and >>>>> having the logic in apply method for performing outer-join. >>>>> >>>>> I am thinking of applying the same key on matchingAndNonMatching and >>>>> flatmap to take care of rest logic. >>>>> >>>>> Or are you suggestion to use Co-FlatMapFunction after the outer-join >>>>> operation (I mean after doing the window and >>>>> getting matchingAndNonMatching stream )? >>>>> >>>>> Regards, >>>>> Vinay Patil >>>>> >>>>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink >>>>> User Mailing List archive.] <[hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote: >>>>> >>>>>> Thanks for the explanation. I think I understood your usecase. >>>>>> >>>>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a >>>>>> keyed stream (keyed by join key). >>>>>> One input would be the unmatched outer join records, the other input >>>>>> would serve the events you want to match them with. >>>>>> Retrieving elements from RocksDB will be local and should be fast. >>>>>> >>>>>> You should be confident though, that all unmatched record will be >>>>>> picked up at some point (RocksDB persists to disk, so you won't run out >>>>>> of >>>>>> memory but snapshots size will increase). >>>>>> The future state expiry feature will avoid such situations. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>: >>>>>> >>>>>>> Hi Fabian, >>>>>>> >>>>>>> I had already used Co-Group function earlier but were getting some >>>>>>> issues while dealing with watermarks (for one use case I was not getting >>>>>>> the correct result), so I have used the union operator for performing >>>>>>> the >>>>>>> outer-join (WindowFunction on a keyedStream), this approach is working >>>>>>> correctly and giving me correct results. >>>>>>> >>>>>>> As I have discussed the scenario, I want to maintain the >>>>>>> non-matching records in some store, so that's why I was thinking of >>>>>>> using >>>>>>> RocksDB as a store here, where I will maintain the user-defined state >>>>>>> after the outer-join window operator, and I can query it using Flink to >>>>>>> check if the value for a particular key is present or not , if present I >>>>>>> can match them and send it downstream. >>>>>>> >>>>>>> The final goal is to have zero non-matching records, so this is the >>>>>>> backup plan to handle edge-case scenarios. >>>>>>> >>>>>>> I have already integrated code to write to Cassandra using Flink >>>>>>> Connector, but I think this will be a better option rather than hitting >>>>>>> the >>>>>>> query to external store since RocksDb will store the data to local TM >>>>>>> disk, >>>>>>> the retrieval will be faster here than Cassandra , right ? >>>>>>> >>>>>>> What do you think ? >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Vinay Patil >>>>>>> >>>>>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink >>>>>>> User Mailing List archive.] <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote: >>>>>>> >>>>>>>> Hi Vinay, >>>>>>>> >>>>>>>> can you give a bit more detail about how you plan to implement the >>>>>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a >>>>>>>> KeyedStream? >>>>>>>> >>>>>>>> An alternative could be to use a CoGroup operator which collects >>>>>>>> from two inputs all elements that share a common key (the join key) >>>>>>>> and are >>>>>>>> in the same window. The interface of the function provides two >>>>>>>> iterators >>>>>>>> over the elements of both inputs and can be used to implement outer >>>>>>>> join >>>>>>>> functionality. The benefit of working with a CoGroupFunction is that >>>>>>>> you do >>>>>>>> not have to take care of state handling at all. >>>>>>>> >>>>>>>> In case you go for a custom implementation you will need to work >>>>>>>> with operator state. >>>>>>>> However, you do not need to directly interact with RocksDB. Flink >>>>>>>> is taking care of that for you. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email] >>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>: >>>>>>>> >>>>>>>>> Hi Fabian/Stephan, >>>>>>>>> >>>>>>>>> Waiting for your suggestion >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Vinay Patil >>>>>>>>> >>>>>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email] >>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote: >>>>>>>>> >>>>>>>>>> Hi Fabian/Stephan, >>>>>>>>>> >>>>>>>>>> This makes things clear. >>>>>>>>>> >>>>>>>>>> This is the use case I have : >>>>>>>>>> I am performing a outer join operation on the two streams (in >>>>>>>>>> window) after which I get matchingAndNonMatchingStream, now I want >>>>>>>>>> to make >>>>>>>>>> sure that the matching rate is high (matching cannot happen if one >>>>>>>>>> of the >>>>>>>>>> source is not emitting elements for certain time) , so to tackle this >>>>>>>>>> situation I was thinking of using RocksDB as a state Backend, where >>>>>>>>>> I will >>>>>>>>>> insert the unmatched records in it (key - will be same as used for >>>>>>>>>> window >>>>>>>>>> and value will be DTO ), so before inserting into it I will check if >>>>>>>>>> it is >>>>>>>>>> already present in RocksDB, if yes I will take the data from it and >>>>>>>>>> send it >>>>>>>>>> downstream (and ensure I perform the clean operation for that key). >>>>>>>>>> (Also the data to store should be encrypted, encryption part can >>>>>>>>>> be handled ) >>>>>>>>>> >>>>>>>>>> so instead of using Cassandra , Can I do this using RocksDB as >>>>>>>>>> state backend since the state is not gone after checkpointing ? >>>>>>>>>> >>>>>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe >>>>>>>>>> on handling late elements but to tackle edge case scenarios like the >>>>>>>>>> one >>>>>>>>>> mentioned above we are having a backup plan of using Cassandra as >>>>>>>>>> external >>>>>>>>>> store since we are dealing with financial critical data. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Vinay Patil >>>>>>>>>> >>>>>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email] >>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Vinaj, >>>>>>>>>>> >>>>>>>>>>> if you use user-defined state, you have to manually clear it. >>>>>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB) >>>>>>>>>>> until the >>>>>>>>>>> job goes down (planned or due to an OOM error). >>>>>>>>>>> >>>>>>>>>>> This is esp. important to keep in mind, when using keyed state. >>>>>>>>>>> If you have an unbounded, evolving key space you will likely run >>>>>>>>>>> out-of-memory. >>>>>>>>>>> The job will constantly add state for each new key but won't be >>>>>>>>>>> able to >>>>>>>>>>> clean up the state for "expired" keys. >>>>>>>>>>> >>>>>>>>>>> You could implement a clean-up mechanism this if you implement a >>>>>>>>>>> custom >>>>>>>>>>> stream operator. >>>>>>>>>>> However this is a very low level interface and requires solid >>>>>>>>>>> understanding >>>>>>>>>>> of the internals like timestamps, watermarks and the >>>>>>>>>>> checkpointing >>>>>>>>>>> mechanism. >>>>>>>>>>> >>>>>>>>>>> The community is currently working on a state expiry feature >>>>>>>>>>> (state will be >>>>>>>>>>> discarded if not requested or updated for x minutes). >>>>>>>>>>> >>>>>>>>>>> Regarding the second question: Does state remain local after >>>>>>>>>>> checkpointing? >>>>>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, >>>>>>>>>>> ...) but >>>>>>>>>>> remains in the operator. So the state is not gone after a >>>>>>>>>>> checkpoint is >>>>>>>>>>> completed. >>>>>>>>>>> >>>>>>>>>>> Hope this helps, >>>>>>>>>>> Fabian >>>>>>>>>>> >>>>>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email] >>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>: >>>>>>>>>>> >>>>>>>>>>> > Hi Stephan, >>>>>>>>>>> > >>>>>>>>>>> > Just wanted to jump into this discussion regarding state. >>>>>>>>>>> > >>>>>>>>>>> > So do you mean that if we maintain user-defined state (for >>>>>>>>>>> non-window >>>>>>>>>>> > operators), then if we do not clear it explicitly will the >>>>>>>>>>> data for that >>>>>>>>>>> > key remains in RocksDB. >>>>>>>>>>> > >>>>>>>>>>> > What happens in case of checkpoint ? I read in the >>>>>>>>>>> documentation that after >>>>>>>>>>> > the checkpoint happens the rocksDB data is pushed to the >>>>>>>>>>> desired location >>>>>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the >>>>>>>>>>> data still >>>>>>>>>>> > remain in RocksDB after checkpoint ? >>>>>>>>>>> > >>>>>>>>>>> > Correct me if I have misunderstood this concept >>>>>>>>>>> > >>>>>>>>>>> > For one of our use we were going for this, but since I read >>>>>>>>>>> the above part >>>>>>>>>>> > in documentation so we are going for Cassandra now (to store >>>>>>>>>>> records and >>>>>>>>>>> > query them for a special case) >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > Regards, >>>>>>>>>>> > Vinay Patil >>>>>>>>>>> > >>>>>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email] >>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > > In streaming, memory is mainly needed for state (key/value >>>>>>>>>>> state). The >>>>>>>>>>> > > exact representation depends on the chosen StateBackend. >>>>>>>>>>> > > >>>>>>>>>>> > > State is explicitly released: For windows, state is cleaned >>>>>>>>>>> up >>>>>>>>>>> > > automatically (firing / expiry), for user-defined state, >>>>>>>>>>> keys have to be >>>>>>>>>>> > > explicitly cleared (clear() method) or in the future will >>>>>>>>>>> have the option >>>>>>>>>>> > > to expire. >>>>>>>>>>> > > >>>>>>>>>>> > > The heavy work horse for streaming state is currently >>>>>>>>>>> RocksDB, which >>>>>>>>>>> > > internally uses native (off-heap) memory to keep the data. >>>>>>>>>>> > > >>>>>>>>>>> > > Does that help? >>>>>>>>>>> > > >>>>>>>>>>> > > Stephan >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden >>>>>>>>>>> email] <http:///user/SendEmail.jtp?type=node&node=8829&i=4>> >>>>>>>>>>> > > wrote: >>>>>>>>>>> > > >>>>>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation >>>>>>>>>>> is avoided by >>>>>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe >>>>>>>>>>> methods. >>>>>>>>>>> > > > >>>>>>>>>>> > > > Couldn't find any docs describing mem mgmt in Streamingn >>>>>>>>>>> mode. So... >>>>>>>>>>> > > > >>>>>>>>>>> > > > - Am wondering if this is also the case with Streaming ? >>>>>>>>>>> > > > >>>>>>>>>>> > > > - If so, how does Flink detect that an object is no longer >>>>>>>>>>> being used >>>>>>>>>>> > and >>>>>>>>>>> > > > can be reclaimed for reuse once again ? >>>>>>>>>>> > > > >>>>>>>>>>> > > > -roshan >>>>>>>>>>> > > > >>>>>>>>>>> > > >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> ------------------------------ >>>>>>>>> View this message in context: Re: Streaming - memory management >>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html> >>>>>>>>> Sent from the Apache Flink User Mailing List archive. mailing >>>>>>>>> list archive >>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>>>>>> at Nabble.com. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> ------------------------------ >>>>>>>> If you reply to this email, your message will be added to the >>>>>>>> discussion below: >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html >>>>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>>>> email [hidden email] >>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1> >>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>>>> here. >>>>>>>> NAML >>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> ------------------------------ >>>>>>> View this message in context: Re: Streaming - memory management >>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html> >>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>>>> archive >>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>>>> at Nabble.com. >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> If you reply to this email, your message will be added to the >>>>>> discussion below: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>> ble.com/Re-Streaming-memory-management-tp8829p8837.html >>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>> email [hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=1> >>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>> here. >>>>>> NAML >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> View this message in context: Re: Streaming - memory management >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html> >>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>> archive >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>> at Nabble.com. >>>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> If you reply to this email, your message will be added to the >>>> discussion below: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/Re-Streaming-memory-management-tp8829p8843.html >>>> To start a new topic under Apache Flink User Mailing List archive., >>>> email [hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=8845&i=1> >>>> To unsubscribe from Apache Flink User Mailing List archive., click here >>>> . >>>> NAML >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>> >>> >>> ------------------------------ >>> View this message in context: Re: Streaming - memory management >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8845.html> >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>> at Nabble.com. >>> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Re-Streaming-memory-management-tp8829p8847.html >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] <http:///user/SendEmail.jtp?type=node&node=8849&i=1> >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Streaming - memory management > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8849.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >