Hi Yun!
Thanks for the quick reply.
One of the lowerBounds is large but the table being joined with is ~500
rows. I also have my own operator that only outputs the first value.
public class OnlyFirstUser<T extends GeneratedMessageV3> extends
RichFlatMapFunction<T, T> {
private transient ValueState<Boolean> alreadyOutputted;
@Override
public void flatMap(T value, Collector<T> out) throws Exception {
if (!alreadyOutputted.value()) {
alreadyOutputted.update(true);
out.collect(value);
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Boolean> descriptor =
new ValueStateDescriptor<>(
"alreadyOutputted", // the state name
TypeInformation.of(new TypeHint<Boolean>() {}), //
type information
false); // default value of the state, if nothing
was set
alreadyOutputted = getRuntimeContext().getState(descriptor);
}
}
All of my inputs have this watermark strategy. In the Flink UI, early in
the job run, I see "Low Watermarks" on each node and they increase. After
some checkpoint failures, low watermarks stop appearing in the UI
<https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
.
.assignTimestampsAndWatermarks(
WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
Thanks Yun!
On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <[email protected]> wrote:
> Hi Dan,
>
> Have you use a too large upperBound or lowerBound?
>
> If not, could you also check the watermark strategy ?
> The interval join operator depends on the event-time
> timer for cleanup, and the event-time timer would be
> triggered via watermark.
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Dan Hill <[email protected]>
> *Send Date:*Mon Mar 8 14:59:48 2021
> *Recipients:*user <[email protected]>
> *Subject:*Gradually increasing checkpoint size
>
>> Hi!
>>
>> I'm running a backfill Flink stream job over older data. It has multiple
>> interval joins. I noticed my checkpoint is regularly gaining in size. I'd
>> expect my checkpoints to stabilize and not grow.
>>
>> Is there a setting to prune useless data from the checkpoint? My top
>> guess is that my checkpoint has a bunch of useless state in it.
>>
>> - Dan
>>
>