Hi Yun!

Thanks for the quick reply.

One of the lowerBounds is large but the table being joined with is ~500
rows.  I also have my own operator that only outputs the first value.

public class OnlyFirstUser<T extends GeneratedMessageV3> extends
RichFlatMapFunction<T, T> {


    private transient ValueState<Boolean> alreadyOutputted;


    @Override

    public void flatMap(T value, Collector<T> out) throws Exception {

        if (!alreadyOutputted.value()) {

            alreadyOutputted.update(true);

            out.collect(value);

        }

    }


    @Override

    public void open(Configuration config) {

        ValueStateDescriptor<Boolean> descriptor =

                new ValueStateDescriptor<>(

                        "alreadyOutputted", // the state name

                        TypeInformation.of(new TypeHint<Boolean>() {}), //
type information

                        false); // default value of the state, if nothing
was set

        alreadyOutputted = getRuntimeContext().getState(descriptor);

    }

}

All of my inputs have this watermark strategy.  In the Flink UI, early in
the job run, I see "Low Watermarks" on each node and they increase.  After
some checkpoint failures, low watermarks stop appearing in the UI
<https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
.


.assignTimestampsAndWatermarks(


WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));



Thanks Yun!


On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Dan,
>
> Have you use a too large upperBound or lowerBound?
>
> If not, could you also check the watermark strategy ?
> The interval join operator depends on the event-time
> timer for cleanup, and the event-time timer would be
> triggered via watermark.
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Dan Hill <quietgol...@gmail.com>
> *Send Date:*Mon Mar 8 14:59:48 2021
> *Recipients:*user <user@flink.apache.org>
> *Subject:*Gradually increasing checkpoint size
>
>> Hi!
>>
>> I'm running a backfill Flink stream job over older data.  It has multiple
>> interval joins.  I noticed my checkpoint is regularly gaining in size.  I'd
>> expect my checkpoints to stabilize and not grow.
>>
>> Is there a setting to prune useless data from the checkpoint?  My top
>> guess is that my checkpoint has a bunch of useless state in it.
>>
>> - Dan
>>
>

Reply via email to