Flink version is 1.81 The eaxmple is adapted according to TopSpeedWindowing DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData .assignTimestampsAndWatermarks(new CarTimestamp()).setParallelism(parallelism) .keyBy(0) .countWindow(countSize, slideSize) .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() { private static final long serialVersionUID = 1L;
@Override public double getDelta( Tuple4<Integer, Integer, Double, Long> oldDataPoint, Tuple4<Integer, Integer, Double, Long> newDataPoint) { return newDataPoint.f2 - oldDataPoint.f2; } }, carData.getType().createSerializer(env.getConfig()))) .maxBy(1).setParallelism(parallelism); The size of each checkpoint will increase from 100k to 100m. Why is the size of each checkpoint increasing? In DeltaTrigger.java,I find clear method.In my understand, the size of every checkpoint should be equal @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } Has anyone encountered a similar problem? Darling Andrew D.Lin