Syinchwun,

Can you maybe share more technical details such as design docs/papers about 
this approach, or is it confidential? 
It sounds interesting but the details make a difference (i.e. partial 
boundaries - bookkeeping).

Paris

> On 28 Nov 2016, at 13:01, liuxinchun <liuxinc...@huawei.com> wrote:
> 
> Dear Aljoscha:
> 
> I'm the colleague of 时某人,and participate in the design of Checkpoint 
> mechanism of a certain Streaming Processing System, which refers to Flink's 
> Checkpoint mechanism. In my opinion, the most difficulty in stream's 
> checkpoint is big window's backup. In  many applications, the window may 
> contain hundreds of GB data. We designed a mechanism using incremental backup 
> for window. That is a integrated window may be kept in several successive 
> checkpointed states(every checkpointed state keeps partial window data, 
> records the boundary of the partial window and the integrated window's 
> boundary at that moment). When restoring the state, the thread needs to scan 
> several successive checkpointed states in order to recover the whole window 
> according to the boundaries of partial windows and whole window.
> 
> I think maybe this can be a candidate way of backuping window. What's more, I 
> think window's checkpoint should support user-defined state using 
> Checkpointed Interface. In many applications, users may calculate many import 
> states in accumulated way cording to the history of stream. Once lost, these 
> states couldn't be recovered using current Flink(version 1.1.3) window 
> checkpoint mechanism.
> 
> Syinchwun Leo
> 
> -----邮件原件-----
> 发件人: Aljoscha Krettek [mailto:aljos...@apache.org] 
> 发送时间: 2016年11月28日 18:58
> 收件人: dev@flink.apache.org
> 主题: Re: Window's Checkpoint problem
> 
> Hi,
> this is indeed a bug (though I would see it more as a feature since I think 
> using the Checkpointed interface there can indeed be problematic, as Till 
> pointed out). The problem is that the Scala Wrapper functions have to 
> implement all kinds of interfaces so that they can forward to the wrapped 
> function. Or we would have to have a wrapper function for each combination of 
> interfaces that a user function can implement.
> 
> In the long run, our use of interfaces for user functions does not seem to 
> scale well in the Scala API.
> 
> Cheers,
> Aljoscha
> 
> On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <trohrm...@apache.org> wrote:
> 
>> Hi 时某人,
>> 
>> I think you've found an inconsistency in Flink's windowing API (but 
>> it's the same in the Java API). Handling operator state in the context 
>> of windows is a little bit delicate because you could have multiple 
>> windows in flight, though. I've pulled Aljoscha in this thread who is 
>> more familiar with the windowing API and can give you probably a better 
>> explanation.
>> 
>> I think either we allow it or we check that a window function does not 
>> implement the Checkpointed interface and if it does, then notify the 
>> user about it. Furthermore, I think we should document these subtle 
>> behaviour differences better.
>> 
>> Cheers,
>> Till
>> 
>> On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <shijinkui...@163.com> wrote:
>> 
>>> Is there some State backend and checkpoint design architecture document?
>>> 
>>> 
>>> ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup.
>>> https://github.com/pusuo/streaming-resource/blob/master/flink-meetup
>>> -hz- 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%
>>> E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf
>>> 
>>> 
>>> Thanks
>>> 
>>> At 2016-11-26 10:30:52, "liuxinchun" <liuxinc...@huawei.com> wrote:
>>> 
>>> 
>>> Hi all:
>>> 
>>> 
>>> 
>>> I am paying attention to Flink, and encounter a problem about user
>> defined
>>> window with checkpoint. My code like this:
>>> 
>>> 
>>> 
>>> class WindowStatistics extends WindowFunction[Event, Int, Tuple, 
>>> TimeWindow] with Checkpointed[Option[Int]]: Unit = {
>>> 
>>> 
>>> 
>>>         private var count = 0
>>> 
>>> 
>>> 
>>> override def apply(key: Tuple, window: TimeWindow, input:
>> Iterator[Event],
>>> out: Collector[Int]): Unit = {
>>> 
>>>         count = XXXX
>>> 
>>>         XXXXXXXX
>>> 
>>>         out.collect(count)
>>> 
>>> }
>>> 
>>> override def snapshotState(checkpointId: Long, checkpointTimestamp:
>> Long):
>>> Option[Int] = {
>>> 
>>>          Some(count)
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Override def restoreState(state: Option[Int]): Unit = {
>>> 
>>>          state match {
>>> 
>>>                   case Some(c) => count = c
>>> 
>>>                   case None => count = 0
>>> 
>>>         }
>>> 
>>> }
>>> 
>>> }
>>> 
>>> and
>>> 
>>> env.enableCheckpointing(5000)
>>> 
>>> env.setStateBackend(new RocksDBStateBackend(“file:///data/”))
>>> 
>>> when making checkpoint, my window only make checkpoint of data in 
>>> window(panes), but user defined state(count) is not contained in 
>>> checkpoint. When debugging, I found in
>>> 
>>> line 123 of AbstractUdfStreamOperator.java
>>> 
>>> if (userFunction instanceof Checkpointed) {
>>> 
>>>     XXXXXX
>>> 
>>> }
>>> 
>>> is false(other operators, like map, filter is true). And 
>>> userFunction is actually a ScalaWindowFunctionWrapper object.
>>> 
>>> So, my question is : Is it a bug? If not, what is the design 
>>> philosophy of window’s checkpoint? In many scenes, users may want to 
>>> checkpoint
>> their
>>> own defined states, but the design does not support seemingly. Or my
>> method
>>> of window’s checkpoint application is wrong?
>>> 
>>> Thank you!
>> 

Reply via email to