I think the size of the checkpoint strongly depends on the data you are feeding into this function. Depending on the actual values, it might be that you never fire the window. Please verify what carData actually returns.
Cheers, Till On Mon, Jul 29, 2019 at 11:09 AM 陈Darling <chendonglin...@gmail.com> wrote: > > Flink version is 1.81 > The eaxmple is adapted according to TopSpeedWindowing > > DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData > .assignTimestampsAndWatermarks(new > CarTimestamp()).setParallelism(parallelism) > .keyBy(0) > .countWindow(countSize, slideSize) > .trigger(DeltaTrigger.of(triggerMeters, > new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() { > private static final long serialVersionUID = 1L; > > @Override > public double getDelta( > Tuple4<Integer, Integer, Double, Long> > oldDataPoint, > Tuple4<Integer, Integer, Double, Long> > newDataPoint) { > return newDataPoint.f2 - oldDataPoint.f2; > } > }, carData.getType().createSerializer(env.getConfig()))) > .maxBy(1).setParallelism(parallelism); > > > The size of each checkpoint will increase from 100k to 100m. > > Why is the size of each checkpoint increasing? > > In DeltaTrigger.java,I find clear method.In my understand, the size of every > checkpoint should be equal > > @Override > public void clear(W window, TriggerContext ctx) throws Exception { > ctx.getPartitionedState(stateDesc).clear(); > } > > > > Has anyone encountered a similar problem? > > > > > > Darling > Andrew D.Lin > > > >