Syinchwun, Can you maybe share more technical details such as design docs/papers about this approach, or is it confidential? It sounds interesting but the details make a difference (i.e. partial boundaries - bookkeeping).
Paris > On 28 Nov 2016, at 13:01, liuxinchun <liuxinc...@huawei.com> wrote: > > Dear Aljoscha: > > I'm the colleague of 时某人,and participate in the design of Checkpoint > mechanism of a certain Streaming Processing System, which refers to Flink's > Checkpoint mechanism. In my opinion, the most difficulty in stream's > checkpoint is big window's backup. In many applications, the window may > contain hundreds of GB data. We designed a mechanism using incremental backup > for window. That is a integrated window may be kept in several successive > checkpointed states(every checkpointed state keeps partial window data, > records the boundary of the partial window and the integrated window's > boundary at that moment). When restoring the state, the thread needs to scan > several successive checkpointed states in order to recover the whole window > according to the boundaries of partial windows and whole window. > > I think maybe this can be a candidate way of backuping window. What's more, I > think window's checkpoint should support user-defined state using > Checkpointed Interface. In many applications, users may calculate many import > states in accumulated way cording to the history of stream. Once lost, these > states couldn't be recovered using current Flink(version 1.1.3) window > checkpoint mechanism. > > Syinchwun Leo > > -----邮件原件----- > 发件人: Aljoscha Krettek [mailto:aljos...@apache.org] > 发送时间: 2016年11月28日 18:58 > 收件人: dev@flink.apache.org > 主题: Re: Window's Checkpoint problem > > Hi, > this is indeed a bug (though I would see it more as a feature since I think > using the Checkpointed interface there can indeed be problematic, as Till > pointed out). The problem is that the Scala Wrapper functions have to > implement all kinds of interfaces so that they can forward to the wrapped > function. Or we would have to have a wrapper function for each combination of > interfaces that a user function can implement. > > In the long run, our use of interfaces for user functions does not seem to > scale well in the Scala API. > > Cheers, > Aljoscha > > On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi 时某人, >> >> I think you've found an inconsistency in Flink's windowing API (but >> it's the same in the Java API). Handling operator state in the context >> of windows is a little bit delicate because you could have multiple >> windows in flight, though. I've pulled Aljoscha in this thread who is >> more familiar with the windowing API and can give you probably a better >> explanation. >> >> I think either we allow it or we check that a window function does not >> implement the Checkpointed interface and if it does, then notify the >> user about it. Furthermore, I think we should document these subtle >> behaviour differences better. >> >> Cheers, >> Till >> >> On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <shijinkui...@163.com> wrote: >> >>> Is there some State backend and checkpoint design architecture document? >>> >>> >>> ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup. >>> https://github.com/pusuo/streaming-resource/blob/master/flink-meetup >>> -hz- 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7% >>> E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf >>> >>> >>> Thanks >>> >>> At 2016-11-26 10:30:52, "liuxinchun" <liuxinc...@huawei.com> wrote: >>> >>> >>> Hi all: >>> >>> >>> >>> I am paying attention to Flink, and encounter a problem about user >> defined >>> window with checkpoint. My code like this: >>> >>> >>> >>> class WindowStatistics extends WindowFunction[Event, Int, Tuple, >>> TimeWindow] with Checkpointed[Option[Int]]: Unit = { >>> >>> >>> >>> private var count = 0 >>> >>> >>> >>> override def apply(key: Tuple, window: TimeWindow, input: >> Iterator[Event], >>> out: Collector[Int]): Unit = { >>> >>> count = XXXX >>> >>> XXXXXXXX >>> >>> out.collect(count) >>> >>> } >>> >>> override def snapshotState(checkpointId: Long, checkpointTimestamp: >> Long): >>> Option[Int] = { >>> >>> Some(count) >>> >>> } >>> >>> >>> >>> Override def restoreState(state: Option[Int]): Unit = { >>> >>> state match { >>> >>> case Some(c) => count = c >>> >>> case None => count = 0 >>> >>> } >>> >>> } >>> >>> } >>> >>> and >>> >>> env.enableCheckpointing(5000) >>> >>> env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) >>> >>> when making checkpoint, my window only make checkpoint of data in >>> window(panes), but user defined state(count) is not contained in >>> checkpoint. When debugging, I found in >>> >>> line 123 of AbstractUdfStreamOperator.java >>> >>> if (userFunction instanceof Checkpointed) { >>> >>> XXXXXX >>> >>> } >>> >>> is false(other operators, like map, filter is true). And >>> userFunction is actually a ScalaWindowFunctionWrapper object. >>> >>> So, my question is : Is it a bug? If not, what is the design >>> philosophy of window’s checkpoint? In many scenes, users may want to >>> checkpoint >> their >>> own defined states, but the design does not support seemingly. Or my >> method >>> of window’s checkpoint application is wrong? >>> >>> Thank you! >>