Hi,David

 

Before I open an issue about this and @Andrey Zagrebin @Aljoscha Krettek 
suggested me to extends the AbstractStreamOperator to custom the operator 
operation on state or extends the statebackend to add a cache layer on it.

 

Fyi: 
https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992

 

And a new stateBackend was introduced in Flink Forward China 2018 by alibaba , 
but it has not been open source , you can take a look on the slides.

 

https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf

 

Thanks,

Aitozi

 

发件人: "David J. C. Beach" <da...@jcbeach.com>
日期: 2019年4月29日 星期一 上午11:52
收件人: aitozi <gjying1...@gmail.com>
抄送: <user@flink.apache.org>
主题: Re: RocksDB backend with deferred writes?

 

Thanks Aitozi.

 

Your answer makes good sense and I'm trying to implement this now.  My code is 
written as a KeyedProcessFunction, but I can't see where this exposes the 
KeyContext interface.  Is there anything you can point me to in the docs?

 

Best,

David

 

 

 

On Sun, Apr 28, 2019 at 8:09 PM aitozi <gjying1...@gmail.com> wrote:

Hi,David

 

RocksdbKeyedBackend is used under keyContext, every operation with state should 
setCurrentKey to let the rocksdb aware of the current key and complute the 
currrent keyGroup. Use these two parts to interactive with the underyling 
rocksdb. 

 

I think you can achieve this goal by setCurrentKey before flush to rocksdb or 
make the prefix key (keygroup + key) yourself put/get value to/from rocksdb.

 

Thanks,

Aitozi

 

 

发件人: "David J. C. Beach" <da...@jcbeach.com>
日期: 2019年4月29日 星期一 上午6:43
收件人: <user@flink.apache.org>
主题: RocksDB backend with deferred writes?

 

I have a stateful operator in a task which processes thousands of elements per 
second (per task) when using the Filesystem backend.  As documented and 
reported by other users, when I switch to the RocksDB backend, throughput is 
considerably lower.  I need something that combines the high performance of 
in-memory with the large state and incremental checkpointing of RocksDB.

 

For my application, I can *almost* accomplish this by including a caching layer 
which maintains a map of pending (key, value) writes.  These are periodically 
flushed to the RocksDB (and always prior to a checkpoint).  This greatly 
reduces the number of writes to RocksDB, and means that I can get a "best of 
both worlds" in terms of throughput and reliability/incremental checkpointing.  
These optimizations make sense for my workload, since events which operate on 
the same key tend to be close together in the stream.  (Over the long haul, 
there are many millions of keys, and occasionally, an event references some key 
from the distant past, hence the need for RocksDB.)

 

Unfortunately, this solution does not work perfectly because when I do 
eventually flush writes to the underlying RocksDB backend, the stateful 
processor may be operating on an element which belongs to a different key 
group.  Hence, the elements that I flush are associated with the wrong key 
group, and things don't work quite right.

 

Is there any way to wrap the RocksDB backend with caching and deferred writes 
(in a way which is "key-group aware")?

 

Thanks!

 

David

 

Reply via email to