Hi, David

When you flush data to db, you can reference the serialize logic[1], and store 
the serialized bytes to RocksDB.

[1] 
https://github.com/apache/flink/blob/c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java#L136

Best, Congxian
On Apr 29, 2019, 12:57 +0800, aitozi <gjying1...@gmail.com>, wrote:
> Hi,David
>
> Before I open an issue about this and @Andrey Zagrebin @Aljoscha Krettek 
> suggested me to extends the AbstractStreamOperator to custom the operator 
> operation on state or extends the statebackend to add a cache layer on it.
>
> Fyi: 
> https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992
>
> And a new stateBackend was introduced in Flink Forward China 2018 by alibaba 
> , but it has not been open source , you can take a look on the slides.
>
> https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf
>
> Thanks,
> Aitozi
>
> 发件人: "David J. C. Beach" <da...@jcbeach.com>
> 日期: 2019年4月29日 星期一 上午11:52
> 收件人: aitozi <gjying1...@gmail.com>
> 抄送: <user@flink.apache.org>
> 主题: Re: RocksDB backend with deferred writes?
>
> Thanks Aitozi.
>
> Your answer makes good sense and I'm trying to implement this now.  My code 
> is written as a KeyedProcessFunction, but I can't see where this exposes the 
> KeyContext interface.  Is there anything you can point me to in the docs?
>
> Best,
> David
>
>
>
> On Sun, Apr 28, 2019 at 8:09 PM aitozi <gjying1...@gmail.com> wrote:
> > Hi,David
> >
> > RocksdbKeyedBackend is used under keyContext, every operation with state 
> > should setCurrentKey to let the rocksdb aware of the current key and 
> > complute the currrent keyGroup. Use these two parts to interactive with the 
> > underyling rocksdb.
> >
> > I think you can achieve this goal by setCurrentKey before flush to rocksdb 
> > or make the prefix key (keygroup + key) yourself put/get value to/from 
> > rocksdb.
> >
> > Thanks,
> > Aitozi
> >
> >
> > 发件人: "David J. C. Beach" <da...@jcbeach.com>
> > 日期: 2019年4月29日 星期一 上午6:43
> > 收件人: <user@flink.apache.org>
> > 主题: RocksDB backend with deferred writes?
> >
> > I have a stateful operator in a task which processes thousands of elements 
> > per second (per task) when using the Filesystem backend.  As documented and 
> > reported by other users, when I switch to the RocksDB backend, throughput 
> > is considerably lower.  I need something that combines the high performance 
> > of in-memory with the large state and incremental checkpointing of RocksDB.
> >
> > For my application, I can *almost* accomplish this by including a caching 
> > layer which maintains a map of pending (key, value) writes.  These are 
> > periodically flushed to the RocksDB (and always prior to a checkpoint).  
> > This greatly reduces the number of writes to RocksDB, and means that I can 
> > get a "best of both worlds" in terms of throughput and 
> > reliability/incremental checkpointing.  These optimizations make sense for 
> > my workload, since events which operate on the same key tend to be close 
> > together in the stream.  (Over the long haul, there are many millions of 
> > keys, and occasionally, an event references some key from the distant past, 
> > hence the need for RocksDB.)
> >
> > Unfortunately, this solution does not work perfectly because when I do 
> > eventually flush writes to the underlying RocksDB backend, the stateful 
> > processor may be operating on an element which belongs to a different key 
> > group.  Hence, the elements that I flush are associated with the wrong key 
> > group, and things don't work quite right.
> >
> > Is there any way to wrap the RocksDB backend with caching and deferred 
> > writes (in a way which is "key-group aware")?
> >
> > Thanks!
> >
> > David
> >

Reply via email to