Hi all:

I am paying attention to Flink, and encounter a problem about user defined 
window with checkpoint. My code like this:

class WindowStatistics extends WindowFunction[Event, Int, Tuple, TimeWindow] 
with Checkpointed[Option[Int]]: Unit = {

         private var count = 0

override def apply(key: Tuple, window: TimeWindow, input: Iterator[Event], out: 
Collector[Int]): Unit = {

         count = XXXX
         XXXXXXXX
         out.collect(count)
}

override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): 
Option[Int] = {

         Some(count)
}

Override def restoreState(state: Option[Int]): Unit = {

         state match {
                   case Some(c) => count = c
                   case None => count = 0
         }
}
}

and
env.enableCheckpointing(5000)
env.setStateBackend(new RocksDBStateBackend(“file:///data/”))

when making checkpoint, my window only make checkpoint of data in 
window(panes), but user defined state(count) is not contained in checkpoint. 
When debugging, I found in

line 123 of AbstractUdfStreamOperator.java
if (userFunction instanceof Checkpointed) {
     XXXXXX
}
is false(other operators, like map, filter is true). And userFunction is 
actually a ScalaWindowFunctionWrapper object.

So, my question is : Is it a bug? If not, what is the design philosophy of 
window’s checkpoint? In many scenes, users may want to checkpoint their own 
defined states, but the design does not support seemingly. Or my method of 
window’s checkpoint application is wrong?

Thank you!

刘新春 00354552
大数据技术开发部
*****************************************************************************

[cid:image012.jpg@01D0D9C8.DDEDCC20]文档包<http://platformdoc.huawei.com/hedex/hwdc/doc/docInfo.jsp?productId=2472&type=doc>

[cid:image013.jpg@01D0D9C8.DDEDCC20]培训中心<http://3ms.huawei.com/hi/index.php?app=group&mod=Core&act=showSectionData&gid=2031037&id=1250433>

[cid:image014.jpg@01D0D9C8.DDEDCC20]案例库<http://3ms.huawei.com/hi/group/2031037/threads.html#category=1179741>



Reply via email to