Thanks Rohrmann. Your answer inspired me.

CountWindow  defaults to using CountTrigger, but I set the trigger again.  

Parallelism is 1

.trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer)


Through testing,I found that the data is generated much faster than tigger and 
countSize,slideSize is 300000,
DeltaTrigger threhold is 50. 
The size  of CountWindow is bigger than trigger size.

will it be caused by this reason?



Darling 
Andrew D.Lin



> 下面是被转发的邮件:
> 
> 发件人: Till Rohrmann <trohrm...@apache.org <mailto:trohrm...@apache.org>>
> 主题: 回复: Why is the size of each checkpoint increasing?
> 日期: 2019年7月29日 GMT+8 下午8:58:44
> 收件人: 陈Darling <chendonglin...@gmail.com <mailto:chendonglin...@gmail.com>>
> 抄送: user@flink.apache.org <mailto:user@flink.apache.org>
> 
> I think the size of the checkpoint strongly depends on the data you are 
> feeding into this function. Depending on the actual values, it might be that 
> you never fire the window. Please verify what carData actually returns.
> 
> Cheers,
> Till
> 
> On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <chendonglin...@gmail.com 
> <mailto:chendonglin...@gmail.com>> wrote:
> 
> Flink version is 1.81
> The eaxmple is adapted according to TopSpeedWindowing
> DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
>         .assignTimestampsAndWatermarks(new 
> CarTimestamp()).setParallelism(parallelism)
>         .keyBy(0)
>         .countWindow(countSize, slideSize)
>         .trigger(DeltaTrigger.of(triggerMeters,
>                 new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
>                     private static final long serialVersionUID = 1L;
> 
>                     @Override
>                     public double getDelta(
>                             Tuple4<Integer, Integer, Double, Long> 
> oldDataPoint,
>                             Tuple4<Integer, Integer, Double, Long> 
> newDataPoint) {
>                         return newDataPoint.f2 - oldDataPoint.f2;
>                     }
>                 }, carData.getType().createSerializer(env.getConfig())))
>         .maxBy(1).setParallelism(parallelism);
> 
> The size of each checkpoint will increase from 100k to 100m.
> 
> Why is the size of each checkpoint increasing? 
> In DeltaTrigger.java,I find clear method.In my understand, the size of every 
> checkpoint should be equal
> @Override
> public void clear(W window, TriggerContext ctx) throws Exception {
>    ctx.getPartitionedState(stateDesc).clear();
> }
> 
> 
> Has anyone encountered a similar problem?
> 
> 
> 
> 
> 
> Darling 
> Andrew D.Lin
> 
> 
> 

Reply via email to