Hi  Andrew
>From Flink doc[1], there is "Flink guarantees removal only for time-based
windows and not for other types, *e.g.* global windows (see Window Assigners
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#window-assigners>).
",
Seems the state of the fired window wouldn't be cleared for a count window,
you can verify this to see each result of your trigger is what value of
`TriggerResult`.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#window-lifecycle
Best,
Congxian


陈Darling <chendonglin...@gmail.com> 于2019年7月30日周二 下午7:20写道:

> Thanks Rohrmann. Your answer inspired me.
>
> CountWindow  defaults to using CountTrigger, but I set the trigger again.
>
> Parallelism is 1
>
> .trigger(DeltaTrigger.of(50,deltaFunction,stateSerializer)
>
>
> Through testing,I found that the data is generated much faster than tigger
> and countSize,slideSize is 300000,
> DeltaTrigger threhold is 50.
> The size  of CountWindow is bigger than trigger size.
>
> will it be caused by this reason?
>
>
>
> Darling
> Andrew D.Lin
>
>
>
> 下面是被转发的邮件:
>
> *发件人: *Till Rohrmann <trohrm...@apache.org>
> *主题: **回复: Why is the size of each checkpoint increasing?*
> *日期: *2019年7月29日 GMT+8 下午8:58:44
> *收件人: *陈Darling <chendonglin...@gmail.com>
> *抄送: *user@flink.apache.org
>
> I think the size of the checkpoint strongly depends on the data you are
> feeding into this function. Depending on the actual values, it might be
> that you never fire the window. Please verify what carData actually returns.
>
> Cheers,
> Till
>
> On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <chendonglin...@gmail.com>
> wrote:
>
>>
>> Flink version is 1.81
>> The eaxmple is adapted according to TopSpeedWindowing
>>
>> DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
>>         .assignTimestampsAndWatermarks(new 
>> CarTimestamp()).setParallelism(parallelism)
>>         .keyBy(0)
>>         .countWindow(countSize, slideSize)
>>         .trigger(DeltaTrigger.of(triggerMeters,
>>                 new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
>>                     private static final long serialVersionUID = 1L;
>>
>>                     @Override
>>                     public double getDelta(
>>                             Tuple4<Integer, Integer, Double, Long> 
>> oldDataPoint,
>>                             Tuple4<Integer, Integer, Double, Long> 
>> newDataPoint) {
>>                         return newDataPoint.f2 - oldDataPoint.f2;
>>                     }
>>                 }, carData.getType().createSerializer(env.getConfig())))
>>         .maxBy(1).setParallelism(parallelism);
>>
>>
>> The size of each checkpoint will increase from 100k to 100m.
>>
>> Why is the size of each checkpoint increasing?
>>
>> In DeltaTrigger.java,I find clear method.In my understand, the size of every 
>> checkpoint should be equal
>>
>> @Override
>> public void clear(W window, TriggerContext ctx) throws Exception {
>>    ctx.getPartitionedState(stateDesc).clear();
>> }
>>
>>
>>
>> Has anyone encountered a similar problem?
>>
>>
>>
>>
>>
>> Darling
>> Andrew D.Lin
>>
>>
>>
>>
>

Reply via email to