Hi, David When you flush data to db, you can reference the serialize logic[1], and store the serialized bytes to RocksDB.
[1] https://github.com/apache/flink/blob/c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java#L136 Best, Congxian On Apr 29, 2019, 12:57 +0800, aitozi <gjying1...@gmail.com>, wrote: > Hi,David > > Before I open an issue about this and @Andrey Zagrebin @Aljoscha Krettek > suggested me to extends the AbstractStreamOperator to custom the operator > operation on state or extends the statebackend to add a cache layer on it. > > Fyi: > https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992 > > And a new stateBackend was introduced in Flink Forward China 2018 by alibaba > , but it has not been open source , you can take a look on the slides. > > https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf > > Thanks, > Aitozi > > 发件人: "David J. C. Beach" <da...@jcbeach.com> > 日期: 2019年4月29日 星期一 上午11:52 > 收件人: aitozi <gjying1...@gmail.com> > 抄送: <user@flink.apache.org> > 主题: Re: RocksDB backend with deferred writes? > > Thanks Aitozi. > > Your answer makes good sense and I'm trying to implement this now. My code > is written as a KeyedProcessFunction, but I can't see where this exposes the > KeyContext interface. Is there anything you can point me to in the docs? > > Best, > David > > > > On Sun, Apr 28, 2019 at 8:09 PM aitozi <gjying1...@gmail.com> wrote: > > Hi,David > > > > RocksdbKeyedBackend is used under keyContext, every operation with state > > should setCurrentKey to let the rocksdb aware of the current key and > > complute the currrent keyGroup. Use these two parts to interactive with the > > underyling rocksdb. > > > > I think you can achieve this goal by setCurrentKey before flush to rocksdb > > or make the prefix key (keygroup + key) yourself put/get value to/from > > rocksdb. > > > > Thanks, > > Aitozi > > > > > > 发件人: "David J. C. Beach" <da...@jcbeach.com> > > 日期: 2019年4月29日 星期一 上午6:43 > > 收件人: <user@flink.apache.org> > > 主题: RocksDB backend with deferred writes? > > > > I have a stateful operator in a task which processes thousands of elements > > per second (per task) when using the Filesystem backend. As documented and > > reported by other users, when I switch to the RocksDB backend, throughput > > is considerably lower. I need something that combines the high performance > > of in-memory with the large state and incremental checkpointing of RocksDB. > > > > For my application, I can *almost* accomplish this by including a caching > > layer which maintains a map of pending (key, value) writes. These are > > periodically flushed to the RocksDB (and always prior to a checkpoint). > > This greatly reduces the number of writes to RocksDB, and means that I can > > get a "best of both worlds" in terms of throughput and > > reliability/incremental checkpointing. These optimizations make sense for > > my workload, since events which operate on the same key tend to be close > > together in the stream. (Over the long haul, there are many millions of > > keys, and occasionally, an event references some key from the distant past, > > hence the need for RocksDB.) > > > > Unfortunately, this solution does not work perfectly because when I do > > eventually flush writes to the underlying RocksDB backend, the stateful > > processor may be operating on an element which belongs to a different key > > group. Hence, the elements that I flush are associated with the wrong key > > group, and things don't work quite right. > > > > Is there any way to wrap the RocksDB backend with caching and deferred > > writes (in a way which is "key-group aware")? > > > > Thanks! > > > > David > >