Hi Yun! Thanks for the quick reply.
One of the lowerBounds is large but the table being joined with is ~500 rows. I also have my own operator that only outputs the first value. public class OnlyFirstUser<T extends GeneratedMessageV3> extends RichFlatMapFunction<T, T> { private transient ValueState<Boolean> alreadyOutputted; @Override public void flatMap(T value, Collector<T> out) throws Exception { if (!alreadyOutputted.value()) { alreadyOutputted.update(true); out.collect(value); } } @Override public void open(Configuration config) { ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>( "alreadyOutputted", // the state name TypeInformation.of(new TypeHint<Boolean>() {}), // type information false); // default value of the state, if nothing was set alreadyOutputted = getRuntimeContext().getState(descriptor); } } All of my inputs have this watermark strategy. In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase. After some checkpoint failures, low watermarks stop appearing in the UI <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing> . .assignTimestampsAndWatermarks( WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); Thanks Yun! On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi Dan, > > Have you use a too large upperBound or lowerBound? > > If not, could you also check the watermark strategy ? > The interval join operator depends on the event-time > timer for cleanup, and the event-time timer would be > triggered via watermark. > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Dan Hill <quietgol...@gmail.com> > *Send Date:*Mon Mar 8 14:59:48 2021 > *Recipients:*user <user@flink.apache.org> > *Subject:*Gradually increasing checkpoint size > >> Hi! >> >> I'm running a backfill Flink stream job over older data. It has multiple >> interval joins. I noticed my checkpoint is regularly gaining in size. I'd >> expect my checkpoints to stabilize and not grow. >> >> Is there a setting to prune useless data from the checkpoint? My top >> guess is that my checkpoint has a bunch of useless state in it. >> >> - Dan >> >