Thanks Ufuk,

That was very helpful. But that raised a few more questions :-):

1. Calling clear() on the KV state is only possible for snapshots right? Do
you control that for checkpoints too.

2. Assuming that the user has no control over the checkpoint process
outside of controlling the checkpoint interval , when is the RocksDB
cleared of the operator state for checkpoints after they are long past. It
seems like there are only two checkpoints that are really necessary to
maintain, the current one and the previous one for restore. Does Flink
clean up checkpoints on a timer? When it does clean up checkpoints does it
also clean up the state backend (I am assuming they are different).

3. The pre-aggregating windows was very helpful as the WindowFunction is
now passed the pre-aggregated state. For windows, are the Reduce and Fold
functions called on each element event before the window is triggered. I
can see how that would work where the pre-compute is done per element but
the actual output is emitted only when the window is fired. But that is
only possible if there are no Evictors defined on the window? Also how are
the elements fed to the Reduce/Fold function. Is it like MapReduce where
even if you are using a Iterator, in reality all the values for a key are
not buffered into memory? Which ties back to how is RocksDB is used to
store a large window state before it is triggered. If my elements are
accumulating in a window (serving a ReduceFunction) does it spill to disk
(RocksDB?) when a threshold size is reached?

Thanks,
Sameer



On Tue, Jul 26, 2016 at 7:29 AM, Ufuk Celebi <u...@apache.org> wrote:

> On Mon, Jul 25, 2016 at 8:50 PM, Sameer W <sam...@axiomine.com> wrote:
> > The question is, if using really long windows (in hours) if the state of
> the
> > window gets very large over time, would size of the RocksDB get larger?
> > Would replication to HDFS start causing performance bottlenecks? Also
> would
> > this need a constant (at checkpoint interval?), read from RocksDB, add
> more
> > window elements and write to RocksDB.
>
> Yes. The size of the RocksDB instance is directly correlated with the
> number of K/V state pairs you store. You can remove state by calling
> `clear()` on the KvState instance.
>
> All state updates go directly to RocksDB and snapshots copy the DB
> files (semi-async mode, current default) or iterate-and-copy all K/V
> pairs (fully-async mode). No records are deleted automatically after
> snapshots.
>
> Snapshotting large RocksDB instances will cause some slow down, but
> you can trade this cost off by adjusting the checkpointing interval.
> There are plans to do the snapshots in an incremental fashion in order
> to lower the costs for this, but there is no design doc available for
> it at this point.
>
> > Outside of the read costs, is there a risk to having very long windows
> when
> > you know you could collect a lot of elements in them. Instead is it
> safer to
> > perform aggregations on top of aggregations or use your own custom remote
> > store like HBase to persist larger state per record and use windows only
> to
> > store the keys in HBase. I mention HBase because of its support for
> column
> > qualifiers allow elements to be added to the same key in multiple ordered
> > column qualifiers. Reading can also be throttled in batches of column
> > qualifiers allowing for the better memory consumption. Is this approach
> used
> > in practice?
>
> RocksDB works quite well for large stateful jobs. If possible for your
> use case, I would still recommend work with pre-aggregating window
> functions (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions
> )
> or pre-aggregating the data. The I/O costs will correlate with the
> state size, but there is "no risk" in the sense of that it will still
> work as expected.
>
> What you describe with HBase could work, but I'm not aware of someone
> doing this. Furhtermore, depending on your use case, it can cause
> problems in failure scenarios, because you might need to keep HBase
> and Flink state in sync.
>

Reply via email to