Hi Fabian/Stephan,

Waiting for your suggestion

Regards,
Vinay Patil

On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> Hi Fabian/Stephan,
>
> This makes things clear.
>
> This is the use case I have :
> I am performing a outer join operation on the two streams (in window)
> after which I get matchingAndNonMatchingStream, now I want to make sure
> that the matching rate is high (matching cannot happen if one of the source
> is not emitting elements for certain time) , so to tackle this situation I
> was thinking of using RocksDB as a state Backend, where I will insert the
> unmatched records in it (key - will be same as used for window and value
> will be DTO ), so before inserting into it I will check if it is already
> present in RocksDB, if yes I will take the data from it and send it
> downstream (and ensure I perform the clean operation for that key).
> (Also the data to store should be encrypted, encryption part can be
> handled )
>
> so instead of using Cassandra , Can I do this using RocksDB as state
> backend since the state is not gone after checkpointing ?
>
> P.S I have kept the watermark behind by 1500 secs just to be safe on
> handling late elements but to tackle edge case scenarios like the one
> mentioned above we are having a backup plan of using Cassandra as external
> store since we are dealing with financial critical data.
>
> Regards,
> Vinay Patil
>
> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Vinaj,
>>
>> if you use user-defined state, you have to manually clear it.
>> Otherwise, it will stay in the state backend (heap or RocksDB) until the
>> job goes down (planned or due to an OOM error).
>>
>> This is esp. important to keep in mind, when using keyed state.
>> If you have an unbounded, evolving key space you will likely run
>> out-of-memory.
>> The job will constantly add state for each new key but won't be able to
>> clean up the state for "expired" keys.
>>
>> You could implement a clean-up mechanism this if you implement a custom
>> stream operator.
>> However this is a very low level interface and requires solid
>> understanding
>> of the internals like timestamps, watermarks and the checkpointing
>> mechanism.
>>
>> The community is currently working on a state expiry feature (state will
>> be
>> discarded if not requested or updated for x minutes).
>>
>> Regarding the second question: Does state remain local after
>> checkpointing?
>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but
>> remains in the operator. So the state is not gone after a checkpoint is
>> completed.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>:
>>
>> > Hi Stephan,
>> >
>> > Just wanted to jump into this discussion regarding state.
>> >
>> > So do you mean that if we maintain user-defined state (for non-window
>> > operators), then if we do  not clear it explicitly will the data for
>> that
>> > key remains in RocksDB.
>> >
>> > What happens in case of checkpoint ? I read in the documentation that
>> after
>> > the checkpoint happens the rocksDB data is pushed to the desired
>> location
>> > (hdfs or s3 or other fs), so for user-defined state does the data still
>> > remain in RocksDB after checkpoint ?
>> >
>> > Correct me if I have misunderstood this concept
>> >
>> > For one of our use we were going for this, but since I read the above
>> part
>> > in documentation so we are going for Cassandra now (to store records and
>> > query them for a special case)
>> >
>> >
>> >
>> >
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <se...@apache.org> wrote:
>> >
>> > > In streaming, memory is mainly needed for state (key/value state). The
>> > > exact representation depends on the chosen StateBackend.
>> > >
>> > > State is explicitly released: For windows, state is cleaned up
>> > > automatically (firing / expiry), for user-defined state, keys have to
>> be
>> > > explicitly cleared (clear() method) or in the future will have the
>> option
>> > > to expire.
>> > >
>> > > The heavy work horse for streaming state is currently RocksDB, which
>> > > internally uses native (off-heap) memory to keep the data.
>> > >
>> > > Does that help?
>> > >
>> > > Stephan
>> > >
>> > >
>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <ros...@hortonworks.com
>> >
>> > > wrote:
>> > >
>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>> avoided by
>> > > > storing messages being processed in ByteBuffers via Unsafe methods.
>> > > >
>> > > > Couldn't find any docs  describing mem mgmt in Streamingn mode.
>> So...
>> > > >
>> > > > - Am wondering if this is also the case with Streaming ?
>> > > >
>> > > > - If so, how does Flink detect that an object is no longer being
>> used
>> > and
>> > > > can be reclaimed for reuse once again ?
>> > > >
>> > > > -roshan
>> > > >
>> > >
>> >
>>
>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to