Hi Fabian/Stephan, Waiting for your suggestion
Regards, Vinay Patil On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <vinay18.pa...@gmail.com> wrote: > Hi Fabian/Stephan, > > This makes things clear. > > This is the use case I have : > I am performing a outer join operation on the two streams (in window) > after which I get matchingAndNonMatchingStream, now I want to make sure > that the matching rate is high (matching cannot happen if one of the source > is not emitting elements for certain time) , so to tackle this situation I > was thinking of using RocksDB as a state Backend, where I will insert the > unmatched records in it (key - will be same as used for window and value > will be DTO ), so before inserting into it I will check if it is already > present in RocksDB, if yes I will take the data from it and send it > downstream (and ensure I perform the clean operation for that key). > (Also the data to store should be encrypted, encryption part can be > handled ) > > so instead of using Cassandra , Can I do this using RocksDB as state > backend since the state is not gone after checkpointing ? > > P.S I have kept the watermark behind by 1500 secs just to be safe on > handling late elements but to tackle edge case scenarios like the one > mentioned above we are having a backup plan of using Cassandra as external > store since we are dealing with financial critical data. > > Regards, > Vinay Patil > > On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vinaj, >> >> if you use user-defined state, you have to manually clear it. >> Otherwise, it will stay in the state backend (heap or RocksDB) until the >> job goes down (planned or due to an OOM error). >> >> This is esp. important to keep in mind, when using keyed state. >> If you have an unbounded, evolving key space you will likely run >> out-of-memory. >> The job will constantly add state for each new key but won't be able to >> clean up the state for "expired" keys. >> >> You could implement a clean-up mechanism this if you implement a custom >> stream operator. >> However this is a very low level interface and requires solid >> understanding >> of the internals like timestamps, watermarks and the checkpointing >> mechanism. >> >> The community is currently working on a state expiry feature (state will >> be >> discarded if not requested or updated for x minutes). >> >> Regarding the second question: Does state remain local after >> checkpointing? >> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but >> remains in the operator. So the state is not gone after a checkpoint is >> completed. >> >> Hope this helps, >> Fabian >> >> 2016-08-31 18:17 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>: >> >> > Hi Stephan, >> > >> > Just wanted to jump into this discussion regarding state. >> > >> > So do you mean that if we maintain user-defined state (for non-window >> > operators), then if we do not clear it explicitly will the data for >> that >> > key remains in RocksDB. >> > >> > What happens in case of checkpoint ? I read in the documentation that >> after >> > the checkpoint happens the rocksDB data is pushed to the desired >> location >> > (hdfs or s3 or other fs), so for user-defined state does the data still >> > remain in RocksDB after checkpoint ? >> > >> > Correct me if I have misunderstood this concept >> > >> > For one of our use we were going for this, but since I read the above >> part >> > in documentation so we are going for Cassandra now (to store records and >> > query them for a special case) >> > >> > >> > >> > >> > >> > Regards, >> > Vinay Patil >> > >> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <se...@apache.org> wrote: >> > >> > > In streaming, memory is mainly needed for state (key/value state). The >> > > exact representation depends on the chosen StateBackend. >> > > >> > > State is explicitly released: For windows, state is cleaned up >> > > automatically (firing / expiry), for user-defined state, keys have to >> be >> > > explicitly cleared (clear() method) or in the future will have the >> option >> > > to expire. >> > > >> > > The heavy work horse for streaming state is currently RocksDB, which >> > > internally uses native (off-heap) memory to keep the data. >> > > >> > > Does that help? >> > > >> > > Stephan >> > > >> > > >> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <ros...@hortonworks.com >> > >> > > wrote: >> > > >> > > > As per the docs, in Batch mode, dynamic memory allocation is >> avoided by >> > > > storing messages being processed in ByteBuffers via Unsafe methods. >> > > > >> > > > Couldn't find any docs describing mem mgmt in Streamingn mode. >> So... >> > > > >> > > > - Am wondering if this is also the case with Streaming ? >> > > > >> > > > - If so, how does Flink detect that an object is no longer being >> used >> > and >> > > > can be reclaimed for reuse once again ? >> > > > >> > > > -roshan >> > > > >> > > >> > >> > > -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.