Dear Aljoscha: I'm the colleague of 时某人,and participate in the design of Checkpoint mechanism of a certain Streaming Processing System, which refers to Flink's Checkpoint mechanism. In my opinion, the most difficulty in stream's checkpoint is big window's backup. In many applications, the window may contain hundreds of GB data. We designed a mechanism using incremental backup for window. That is a integrated window may be kept in several successive checkpointed states(every checkpointed state keeps partial window data, records the boundary of the partial window and the integrated window's boundary at that moment). When restoring the state, the thread needs to scan several successive checkpointed states in order to recover the whole window according to the boundaries of partial windows and whole window.
I think maybe this can be a candidate way of backuping window. What's more, I think window's checkpoint should support user-defined state using Checkpointed Interface. In many applications, users may calculate many import states in accumulated way cording to the history of stream. Once lost, these states couldn't be recovered using current Flink(version 1.1.3) window checkpoint mechanism. Syinchwun Leo -----邮件原件----- 发件人: Aljoscha Krettek [mailto:aljos...@apache.org] 发送时间: 2016年11月28日 18:58 收件人: dev@flink.apache.org 主题: Re: Window's Checkpoint problem Hi, this is indeed a bug (though I would see it more as a feature since I think using the Checkpointed interface there can indeed be problematic, as Till pointed out). The problem is that the Scala Wrapper functions have to implement all kinds of interfaces so that they can forward to the wrapped function. Or we would have to have a wrapper function for each combination of interfaces that a user function can implement. In the long run, our use of interfaces for user functions does not seem to scale well in the Scala API. Cheers, Aljoscha On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <trohrm...@apache.org> wrote: > Hi 时某人, > > I think you've found an inconsistency in Flink's windowing API (but > it's the same in the Java API). Handling operator state in the context > of windows is a little bit delicate because you could have multiple > windows in flight, though. I've pulled Aljoscha in this thread who is > more familiar with the windowing API and can give you probably a better > explanation. > > I think either we allow it or we check that a window function does not > implement the Checkpointed interface and if it does, then notify the > user about it. Furthermore, I think we should document these subtle > behaviour differences better. > > Cheers, > Till > > On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <shijinkui...@163.com> wrote: > > > Is there some State backend and checkpoint design architecture document? > > > > > > ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup. > > https://github.com/pusuo/streaming-resource/blob/master/flink-meetup > > -hz- 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7% > > E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf > > > > > > Thanks > > > > At 2016-11-26 10:30:52, "liuxinchun" <liuxinc...@huawei.com> wrote: > > > > > > Hi all: > > > > > > > > I am paying attention to Flink, and encounter a problem about user > defined > > window with checkpoint. My code like this: > > > > > > > > class WindowStatistics extends WindowFunction[Event, Int, Tuple, > > TimeWindow] with Checkpointed[Option[Int]]: Unit = { > > > > > > > > private var count = 0 > > > > > > > > override def apply(key: Tuple, window: TimeWindow, input: > Iterator[Event], > > out: Collector[Int]): Unit = { > > > > count = XXXX > > > > XXXXXXXX > > > > out.collect(count) > > > > } > > > > override def snapshotState(checkpointId: Long, checkpointTimestamp: > Long): > > Option[Int] = { > > > > Some(count) > > > > } > > > > > > > > Override def restoreState(state: Option[Int]): Unit = { > > > > state match { > > > > case Some(c) => count = c > > > > case None => count = 0 > > > > } > > > > } > > > > } > > > > and > > > > env.enableCheckpointing(5000) > > > > env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) > > > > when making checkpoint, my window only make checkpoint of data in > > window(panes), but user defined state(count) is not contained in > > checkpoint. When debugging, I found in > > > > line 123 of AbstractUdfStreamOperator.java > > > > if (userFunction instanceof Checkpointed) { > > > > XXXXXX > > > > } > > > > is false(other operators, like map, filter is true). And > > userFunction is actually a ScalaWindowFunctionWrapper object. > > > > So, my question is : Is it a bug? If not, what is the design > > philosophy of window’s checkpoint? In many scenes, users may want to > > checkpoint > their > > own defined states, but the design does not support seemingly. Or my > method > > of window’s checkpoint application is wrong? > > > > Thank you! >