Hi Fabian,

https://ci.apache.org/projects/flink/flink-docs-master/dev/state_backends.html#the-rocksdbstatebackend

I am referring to this, this does not clearly state if the state will be
maintained in local disk even after checkpointing.

Or I am not getting it correclty :)

Regards,
Vinay Patil

On Thu, Sep 1, 2016 at 1:38 PM, Fabian Hueske-2 [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n884...@n4.nabble.com> wrote:

> You do not have to convert your DTO into a JSON object to use it as a
> key-value state in a Flink function.
> You can pass it as it is via the state interfaces.
>
> Can you point me to the documentation that you find confusing? The state
> documentation [1] says:
>
> >> You can make *every* transformation (map, filter, etc) stateful by
> using Flink’s state interface or checkpointing instance fields of your
> function.
> >> You can register any instance field as *managed* state by implementing
> an interface.
> >> In this case, and also in the case of using Flink’s native state
> interface, Flink will automatically take consistent snapshots of your state
> periodically, and restore its value in the case of a failure.
>
> Is that unclear/confusing or are you referring to different paragraph?
>
> Thanks, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state.html
>
> 2016-09-01 20:22 GMT+02:00 vinay patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8847&i=0>>:
>
>> I don't to join the third stream.
>>
>> And Yes, This is what I was thinking of.also :
>> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>> backup join)
>>
>>
>> I am already done integrating with Cassandra but I feel RocksDB will be a
>> better option, I will have to take care of the clearing part as you have
>> suggested, will check that in documentation.
>>
>> I have the DTO with almost 50 fields , converting it to JSON and storing
>> it as a state should not be a problem , or there is no harm in storing the
>> DTO ?
>>
>> I think the documentation should specify the point that the state will be
>> maintained for user-defined operators to avoid confusion.
>>
>> Regards,
>> Vinay Patil
>>
>> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User
>> Mailing List archive.] <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote:
>>
>>> I thought you would like to join the non-matched elements with another
>>> (third) stream.
>>>
>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(
>>> s3.keyBy).coFlatMap(// backup join)
>>>
>>> If you want to match the non-matched stream with itself a
>>> FlatMapFunction is the right choice.
>>>
>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>>> backup join)
>>>
>>> The backup join puts all non-match elements in the state and waits for
>>> another non-matched element with the same key to do the join.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>:
>>>
>>>> Yes, that's what I am looking for.
>>>>
>>>> But why to use CoFlatMapFunction , I have already got the
>>>> matchingAndNonMatching Stream , by doing the union of two streams and
>>>> having the logic in apply method for performing outer-join.
>>>>
>>>> I am thinking of applying the same key on matchingAndNonMatching and
>>>> flatmap to take care of rest logic.
>>>>
>>>> Or are you suggestion to use Co-FlatMapFunction after the outer-join
>>>> operation  (I mean after doing the window and
>>>> getting matchingAndNonMatching stream )?
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
>>>> Mailing List archive.] <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>>>>
>>>>> Thanks for the explanation. I think I understood your usecase.
>>>>>
>>>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>>>>> stream (keyed by join key).
>>>>> One input would be the unmatched outer join records, the other input
>>>>> would serve the events you want to match them with.
>>>>> Retrieving elements from RocksDB will be local and should be fast.
>>>>>
>>>>> You should be confident though, that all unmatched record will be
>>>>> picked up at some point (RocksDB persists to disk, so you won't run out of
>>>>> memory but snapshots size will increase).
>>>>> The future state expiry feature will avoid such situations.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> I had already used Co-Group function earlier but were getting some
>>>>>> issues while dealing with watermarks (for one use case I was not getting
>>>>>> the correct result), so I have used the union operator for performing the
>>>>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>>>>> correctly and giving me correct results.
>>>>>>
>>>>>> As I have discussed the scenario, I want to maintain the non-matching
>>>>>> records in some store, so that's why I was thinking of using RocksDB as a
>>>>>> store here, where I will maintain the user-defined state  after the
>>>>>> outer-join window operator, and I can query it using Flink to check if 
>>>>>> the
>>>>>> value for a particular key is present or not , if present I can match 
>>>>>> them
>>>>>> and send it downstream.
>>>>>>
>>>>>> The final goal is to have zero non-matching records, so this is the
>>>>>> backup plan to handle edge-case scenarios.
>>>>>>
>>>>>> I have already integrated code to write to Cassandra using Flink
>>>>>> Connector, but I think this will be a better option rather than hitting 
>>>>>> the
>>>>>> query to external store since RocksDb will store the data to local TM 
>>>>>> disk,
>>>>>> the retrieval will be faster here than Cassandra , right ?
>>>>>>
>>>>>> What do you think ?
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink
>>>>>> User Mailing List archive.] <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>>>>
>>>>>>> Hi Vinay,
>>>>>>>
>>>>>>> can you give a bit more detail about how you plan to implement the
>>>>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a 
>>>>>>> KeyedStream?
>>>>>>>
>>>>>>> An alternative could be to use a CoGroup operator which collects
>>>>>>> from two inputs all elements that share a common key (the join key) and 
>>>>>>> are
>>>>>>> in the same window. The interface of the function provides two iterators
>>>>>>> over the elements of both inputs and can be used to implement outer join
>>>>>>> functionality. The benefit of working with a CoGroupFunction is that 
>>>>>>> you do
>>>>>>> not have to take care of state handling at all.
>>>>>>>
>>>>>>> In case you go for a custom implementation you will need to work
>>>>>>> with operator state.
>>>>>>> However, you do not need to directly interact with RocksDB. Flink is
>>>>>>> taking care of that for you.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>>>>>
>>>>>>>> Hi Fabian/Stephan,
>>>>>>>>
>>>>>>>> Waiting for your suggestion
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Vinay Patil
>>>>>>>>
>>>>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>>>>>
>>>>>>>>> Hi Fabian/Stephan,
>>>>>>>>>
>>>>>>>>> This makes things clear.
>>>>>>>>>
>>>>>>>>> This is the use case I have :
>>>>>>>>> I am performing a outer join operation on the two streams (in
>>>>>>>>> window) after which I get matchingAndNonMatchingStream, now I want to 
>>>>>>>>> make
>>>>>>>>> sure that the matching rate is high (matching cannot happen if one of 
>>>>>>>>> the
>>>>>>>>> source is not emitting elements for certain time) , so to tackle this
>>>>>>>>> situation I was thinking of using RocksDB as a state Backend, where I 
>>>>>>>>> will
>>>>>>>>> insert the unmatched records in it (key - will be same as used for 
>>>>>>>>> window
>>>>>>>>> and value will be DTO ), so before inserting into it I will check if 
>>>>>>>>> it is
>>>>>>>>> already present in RocksDB, if yes I will take the data from it and 
>>>>>>>>> send it
>>>>>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>>>>>> (Also the data to store should be encrypted, encryption part can
>>>>>>>>> be handled )
>>>>>>>>>
>>>>>>>>> so instead of using Cassandra , Can I do this using RocksDB as
>>>>>>>>> state backend since the state is not gone after checkpointing ?
>>>>>>>>>
>>>>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe
>>>>>>>>> on handling late elements but to tackle edge case scenarios like the 
>>>>>>>>> one
>>>>>>>>> mentioned above we are having a backup plan of using Cassandra as 
>>>>>>>>> external
>>>>>>>>> store since we are dealing with financial critical data.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Vinay Patil
>>>>>>>>>
>>>>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Vinaj,
>>>>>>>>>>
>>>>>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB)
>>>>>>>>>> until the
>>>>>>>>>> job goes down (planned or due to an OOM error).
>>>>>>>>>>
>>>>>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>>>>>> out-of-memory.
>>>>>>>>>> The job will constantly add state for each new key but won't be
>>>>>>>>>> able to
>>>>>>>>>> clean up the state for "expired" keys.
>>>>>>>>>>
>>>>>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>>>>>> custom
>>>>>>>>>> stream operator.
>>>>>>>>>> However this is a very low level interface and requires solid
>>>>>>>>>> understanding
>>>>>>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>>>>>>> mechanism.
>>>>>>>>>>
>>>>>>>>>> The community is currently working on a state expiry feature
>>>>>>>>>> (state will be
>>>>>>>>>> discarded if not requested or updated for x minutes).
>>>>>>>>>>
>>>>>>>>>> Regarding the second question: Does state remain local after
>>>>>>>>>> checkpointing?
>>>>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3,
>>>>>>>>>> ...) but
>>>>>>>>>> remains in the operator. So the state is not gone after a
>>>>>>>>>> checkpoint is
>>>>>>>>>> completed.
>>>>>>>>>>
>>>>>>>>>> Hope this helps,
>>>>>>>>>> Fabian
>>>>>>>>>>
>>>>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>>>>>
>>>>>>>>>> > Hi Stephan,
>>>>>>>>>> >
>>>>>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>>>>>> >
>>>>>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>>>>>> non-window
>>>>>>>>>> > operators), then if we do  not clear it explicitly will the
>>>>>>>>>> data for that
>>>>>>>>>> > key remains in RocksDB.
>>>>>>>>>> >
>>>>>>>>>> > What happens in case of checkpoint ? I read in the
>>>>>>>>>> documentation that after
>>>>>>>>>> > the checkpoint happens the rocksDB data is pushed to the
>>>>>>>>>> desired location
>>>>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the
>>>>>>>>>> data still
>>>>>>>>>> > remain in RocksDB after checkpoint ?
>>>>>>>>>> >
>>>>>>>>>> > Correct me if I have misunderstood this concept
>>>>>>>>>> >
>>>>>>>>>> > For one of our use we were going for this, but since I read the
>>>>>>>>>> above part
>>>>>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>>>>>> records and
>>>>>>>>>> > query them for a special case)
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Regards,
>>>>>>>>>> > Vinay Patil
>>>>>>>>>> >
>>>>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>>>>>> state). The
>>>>>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>>>>>> > >
>>>>>>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>>>>>>> > > automatically (firing / expiry), for user-defined state, keys
>>>>>>>>>> have to be
>>>>>>>>>> > > explicitly cleared (clear() method) or in the future will
>>>>>>>>>> have the option
>>>>>>>>>> > > to expire.
>>>>>>>>>> > >
>>>>>>>>>> > > The heavy work horse for streaming state is currently
>>>>>>>>>> RocksDB, which
>>>>>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>>>>>> > >
>>>>>>>>>> > > Does that help?
>>>>>>>>>> > >
>>>>>>>>>> > > Stephan
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>>>>>> > > wrote:
>>>>>>>>>> > >
>>>>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation
>>>>>>>>>> is avoided by
>>>>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>>>>>> methods.
>>>>>>>>>> > > >
>>>>>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
>>>>>>>>>> mode. So...
>>>>>>>>>> > > >
>>>>>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>>>>>> > > >
>>>>>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>>>>>> being used
>>>>>>>>>> > and
>>>>>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>>>>>> > > >
>>>>>>>>>> > > > -roshan
>>>>>>>>>> > > >
>>>>>>>>>> > >
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>> View this message in context: Re: Streaming - memory management
>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>>>> archive
>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>>>> at Nabble.com.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> If you reply to this email, your message will be added to the
>>>>>>> discussion below:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>>>> email [hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>>>> here.
>>>>>>> NAML
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> View this message in context: Re: Streaming - memory management
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>>> archive
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>>> at Nabble.com.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Re-Streaming-memory-management-tp8829p8837.html
>>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>>> email [hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=1>
>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>>>>> here.
>>>>> NAML
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> View this message in context: Re: Streaming - memory management
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Re-Streaming-memory-management-tp8829p8843.html
>>> To start a new topic under Apache Flink User Mailing List archive.,
>>> email [hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8845&i=1>
>>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Streaming - memory management
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8845.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Re-Streaming-memory-management-tp8829p8847.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8849.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to