Hi Dan,

Regarding the original checkpoint size problem, could you also have a check 
which tasks' state are increasing from the checkpoint UI ? For example, the 
attached operator has a `alreadyOutputed` value state, which seems to keep
increasing if there are always new keys ?

Best,
Yun



 ------------------Original Mail ------------------
Sender:Dan Hill <quietgol...@gmail.com>
Send Date:Tue Mar 9 00:59:24 2021
Recipients:Yun Gao <yungao...@aliyun.com>
CC:user <user@flink.apache.org>
Subject:Re: Gradually increasing checkpoint size

Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500 rows.  
I also have my own operator that only outputs the first value.

public class OnlyFirstUser<T extends GeneratedMessageV3> extends 
RichFlatMapFunction<T, T> {

 private transient ValueState<Boolean> alreadyOutputted;

 @Override
 public void flatMap(T value, Collector<T> out) throws Exception {
 if (!alreadyOutputted.value()) {
 alreadyOutputted.update(true);
 out.collect(value);
 }
 }

 @Override
 public void open(Configuration config) {
 ValueStateDescriptor<Boolean> descriptor =
 new ValueStateDescriptor<>(
 "alreadyOutputted", // the state name
 TypeInformation.of(new TypeHint<Boolean>() {}), // type information
 false); // default value of the state, if nothing was set
 alreadyOutputted = getRuntimeContext().getState(descriptor);
 }
}

All of my inputs have this watermark strategy.  In the Flink UI, early in the 
job run, I see "Low Watermarks" on each node and they increase.  After some 
checkpoint failures, low watermarks stop appearing in the UI.


.assignTimestampsAndWatermarks(
 
WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));


Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yungao...@aliyun.com> wrote:

Hi Dan,

Have you use a too large upperBound or lowerBound?

If not, could you also check the watermark strategy ?
The interval join operator depends on the event-time
timer for cleanup, and the event-time timer would be
triggered via watermark. 

Best,
Yun



 ------------------Original Mail ------------------
Sender:Dan Hill <quietgol...@gmail.com>
Send Date:Mon Mar 8 14:59:48 2021
Recipients:user <user@flink.apache.org>
Subject:Gradually increasing checkpoint size

Hi!

I'm running a backfill Flink stream job over older data.  It has multiple 
interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd 
expect my checkpoints to stabilize and not grow.

Is there a setting to prune useless data from the checkpoint?  My top guess is 
that my checkpoint has a bunch of useless state in it.

- Dan

Reply via email to