Dear Aljoscha:

 I'm the colleague of 时某人,and participate in the design of Checkpoint mechanism 
of a certain Streaming Processing System, which refers to Flink's Checkpoint 
mechanism. In my opinion, the most difficulty in stream's checkpoint is big 
window's backup. In  many applications, the window may contain hundreds of GB 
data. We designed a mechanism using incremental backup for window. That is a 
integrated window may be kept in several successive checkpointed states(every 
checkpointed state keeps partial window data, records the boundary of the 
partial window and the integrated window's boundary at that moment). When 
restoring the state, the thread needs to scan several successive checkpointed 
states in order to recover the whole window according to the boundaries of 
partial windows and whole window.

I think maybe this can be a candidate way of backuping window. What's more, I 
think window's checkpoint should support user-defined state using Checkpointed 
Interface. In many applications, users may calculate many import states in 
accumulated way cording to the history of stream. Once lost, these states 
couldn't be recovered using current Flink(version 1.1.3) window checkpoint 
mechanism.

Syinchwun Leo

-----邮件原件-----
发件人: Aljoscha Krettek [mailto:aljos...@apache.org] 
发送时间: 2016年11月28日 18:58
收件人: dev@flink.apache.org
主题: Re: Window's Checkpoint problem

Hi,
this is indeed a bug (though I would see it more as a feature since I think 
using the Checkpointed interface there can indeed be problematic, as Till 
pointed out). The problem is that the Scala Wrapper functions have to implement 
all kinds of interfaces so that they can forward to the wrapped function. Or we 
would have to have a wrapper function for each combination of interfaces that a 
user function can implement.

In the long run, our use of interfaces for user functions does not seem to 
scale well in the Scala API.

Cheers,
Aljoscha

On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <trohrm...@apache.org> wrote:

> Hi 时某人,
>
> I think you've found an inconsistency in Flink's windowing API (but 
> it's the same in the Java API). Handling operator state in the context 
> of windows is a little bit delicate because you could have multiple 
> windows in flight, though. I've pulled Aljoscha in this thread who is 
> more familiar with the windowing API and can give you probably a better 
> explanation.
>
> I think either we allow it or we check that a window function does not 
> implement the Checkpointed interface and if it does, then notify the 
> user about it. Furthermore, I think we should document these subtle 
> behaviour differences better.
>
> Cheers,
> Till
>
> On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <shijinkui...@163.com> wrote:
>
> > Is there some State backend and checkpoint design architecture document?
> >
> >
> > ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup.
> > https://github.com/pusuo/streaming-resource/blob/master/flink-meetup
> > -hz- 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%
> > E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf
> >
> >
> > Thanks
> >
> > At 2016-11-26 10:30:52, "liuxinchun" <liuxinc...@huawei.com> wrote:
> >
> >
> > Hi all:
> >
> >
> >
> > I am paying attention to Flink, and encounter a problem about user
> defined
> > window with checkpoint. My code like this:
> >
> >
> >
> > class WindowStatistics extends WindowFunction[Event, Int, Tuple, 
> > TimeWindow] with Checkpointed[Option[Int]]: Unit = {
> >
> >
> >
> >          private var count = 0
> >
> >
> >
> > override def apply(key: Tuple, window: TimeWindow, input:
> Iterator[Event],
> > out: Collector[Int]): Unit = {
> >
> >          count = XXXX
> >
> >          XXXXXXXX
> >
> >          out.collect(count)
> >
> > }
> >
> > override def snapshotState(checkpointId: Long, checkpointTimestamp:
> Long):
> > Option[Int] = {
> >
> >           Some(count)
> >
> > }
> >
> >
> >
> > Override def restoreState(state: Option[Int]): Unit = {
> >
> >           state match {
> >
> >                    case Some(c) => count = c
> >
> >                    case None => count = 0
> >
> >          }
> >
> > }
> >
> > }
> >
> >  and
> >
> > env.enableCheckpointing(5000)
> >
> > env.setStateBackend(new RocksDBStateBackend(“file:///data/”))
> >
> >  when making checkpoint, my window only make checkpoint of data in 
> > window(panes), but user defined state(count) is not contained in 
> > checkpoint. When debugging, I found in
> >
> >  line 123 of AbstractUdfStreamOperator.java
> >
> > if (userFunction instanceof Checkpointed) {
> >
> >      XXXXXX
> >
> > }
> >
> > is false(other operators, like map, filter is true). And 
> > userFunction is actually a ScalaWindowFunctionWrapper object.
> >
> >  So, my question is : Is it a bug? If not, what is the design 
> > philosophy of window’s checkpoint? In many scenes, users may want to 
> > checkpoint
> their
> > own defined states, but the design does not support seemingly. Or my
> method
> > of window’s checkpoint application is wrong?
> >
> >  Thank you!
>

Reply via email to