Flink version is 1.81
The eaxmple is adapted according to TopSpeedWindowing
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
        .assignTimestampsAndWatermarks(new 
CarTimestamp()).setParallelism(parallelism)
        .keyBy(0)
        .countWindow(countSize, slideSize)
        .trigger(DeltaTrigger.of(triggerMeters,
                new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public double getDelta(
                            Tuple4<Integer, Integer, Double, Long> oldDataPoint,
                            Tuple4<Integer, Integer, Double, Long> 
newDataPoint) {
                        return newDataPoint.f2 - oldDataPoint.f2;
                    }
                }, carData.getType().createSerializer(env.getConfig())))
        .maxBy(1).setParallelism(parallelism);

The size of each checkpoint will increase from 100k to 100m.

Why is the size of each checkpoint increasing? 
In DeltaTrigger.java,I find clear method.In my understand, the size of every 
checkpoint should be equal
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
   ctx.getPartitionedState(stateDesc).clear();
}


Has anyone encountered a similar problem?





Darling 
Andrew D.Lin



Reply via email to