Hi Andrew >From Flink doc[1], there is "Flink guarantees removal only for time-based windows and not for other types, *e.g.* global windows (see Window Assigners <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#window-assigners>). ", Seems the state of the fired window wouldn't be cleared for a count window, you can verify this to see each result of your trigger is what value of `TriggerResult`.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#window-lifecycle Best, Congxian 陈Darling <chendonglin...@gmail.com> 于2019年7月30日周二 下午7:20写道: > Thanks Rohrmann. Your answer inspired me. > > CountWindow defaults to using CountTrigger, but I set the trigger again. > > Parallelism is 1 > > .trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer) > > > Through testing,I found that the data is generated much faster than tigger > and countSize,slideSize is 300000, > DeltaTrigger threhold is 50. > The size of CountWindow is bigger than trigger size. > > will it be caused by this reason? > > > > Darling > Andrew D.Lin > > > > 下面是被转发的邮件: > > *发件人: *Till Rohrmann <trohrm...@apache.org> > *主题: **回复: Why is the size of each checkpoint increasing?* > *日期: *2019年7月29日 GMT+8 下午8:58:44 > *收件人: *陈Darling <chendonglin...@gmail.com> > *抄送: *user@flink.apache.org > > I think the size of the checkpoint strongly depends on the data you are > feeding into this function. Depending on the actual values, it might be > that you never fire the window. Please verify what carData actually returns. > > Cheers, > Till > > On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <chendonglin...@gmail.com> > wrote: > >> >> Flink version is 1.81 >> The eaxmple is adapted according to TopSpeedWindowing >> >> DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData >> .assignTimestampsAndWatermarks(new >> CarTimestamp()).setParallelism(parallelism) >> .keyBy(0) >> .countWindow(countSize, slideSize) >> .trigger(DeltaTrigger.of(triggerMeters, >> new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() { >> private static final long serialVersionUID = 1L; >> >> @Override >> public double getDelta( >> Tuple4<Integer, Integer, Double, Long> >> oldDataPoint, >> Tuple4<Integer, Integer, Double, Long> >> newDataPoint) { >> return newDataPoint.f2 - oldDataPoint.f2; >> } >> }, carData.getType().createSerializer(env.getConfig()))) >> .maxBy(1).setParallelism(parallelism); >> >> >> The size of each checkpoint will increase from 100k to 100m. >> >> Why is the size of each checkpoint increasing? >> >> In DeltaTrigger.java,I find clear method.In my understand, the size of every >> checkpoint should be equal >> >> @Override >> public void clear(W window, TriggerContext ctx) throws Exception { >> ctx.getPartitionedState(stateDesc).clear(); >> } >> >> >> >> Has anyone encountered a similar problem? >> >> >> >> >> >> Darling >> Andrew D.Lin >> >> >> >> >