Hey Dan, I think the logic should be correct. Mind that in the processElement we are using *relative*Upper/LowerBound, which are inverted global bound:
relativeUpperBound = upperBound for left and -lowerBound for right relativeLowerBound = lowerBound for left and -upperBound for right Therefore the cleaning logic in onTimer effectively uses the same logic. If I understand it correctly, this trick was introduced to deduplicate the method. There might be a bug somewhere, but I don't think it's where you pointed. I'd suggest to first investigate the progress of watermarks. Best, Dawid On 09/03/2021 08:36, Dan Hill wrote: > Hi Yun! > > That advice was useful. The state for that operator is very small > (31kb). Most of the checkpoint size is in a couple simple > DataStream.intervalJoin operators. The time intervals are fairly short. > > I'm going to try running the code with some small configuration > changes. One thing I did notice is that I set a positive value for > the relativeUpperBound. I'm not sure if I found a bug in > IntervalJoinOperator > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>. > > The logic in IntervalJoinOperator.onEventTime needs an exact timestamp > for clean up. It has some logic around cleaning up the right side > that uses timerTimestamp + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>. > > However, processElement doesn’t use the same logic when creating a > timer (I only see + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>). > > Maybe I'm misreading the code. It feels like a bug. > > > On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yungao...@aliyun.com > <mailto:yungao...@aliyun.com>> wrote: > > Hi Dan, > > Regarding the original checkpoint size problem, could you also > have a check > which tasks' state are increasing from the checkpoint UI ? For > example, the > attached operator has a `alreadyOutputed` value state, which seems > to keep > increasing if there are always new keys ? > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Dan Hill <quietgol...@gmail.com > <mailto:quietgol...@gmail.com>> > *Send Date:*Tue Mar 9 00:59:24 2021 > *Recipients:*Yun Gao <yungao...@aliyun.com > <mailto:yungao...@aliyun.com>> > *CC:*user <user@flink.apache.org <mailto:user@flink.apache.org>> > *Subject:*Re: Gradually increasing checkpoint size > > Hi Yun! > > Thanks for the quick reply. > > One of the lowerBounds is large but the table being joined > with is ~500 rows. I also have my own operator that only > outputs the first value. > > public class OnlyFirstUser<T extends GeneratedMessageV3> > extends RichFlatMapFunction<T, T> { > > > private transient ValueState<Boolean> alreadyOutputted; > > > @Override > > public void flatMap(T value, Collector<T> out) throws > Exception { > > if (!alreadyOutputted.value()) { > > alreadyOutputted.update(true); > > out.collect(value); > > } > > } > > > @Override > > public void open(Configuration config) { > > ValueStateDescriptor<Boolean> descriptor = > > new ValueStateDescriptor<>( > > "alreadyOutputted", // the state name > > TypeInformation.of(new > TypeHint<Boolean>() {}), // type information > > false); // default value of the > state, if nothing was set > > alreadyOutputted = > getRuntimeContext().getState(descriptor); > > } > > } > > > All of my inputs have this watermark strategy. In the > Flink UI, early in the job run, I see "Low Watermarks" on > each node and they increase. After some checkpoint > failures, low watermarks stop appearing in the UI > > <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>. > > > .assignTimestampsAndWatermarks( > > > > WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); > > > > Thanks Yun! > > > On Mon, Mar 8, 2021 at 7:27 AM Yun Gao > <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> wrote: > > Hi Dan, > > Have you use a too large upperBound or lowerBound? > > If not, could you also check the watermark strategy ? > The interval join operator depends on the event-time > timer for cleanup, and the event-time timer would be > triggered via watermark. > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Dan Hill <quietgol...@gmail.com > <mailto:quietgol...@gmail.com>> > *Send Date:*Mon Mar 8 14:59:48 2021 > *Recipients:*user <user@flink.apache.org > <mailto:user@flink.apache.org>> > *Subject:*Gradually increasing checkpoint size > > Hi! > > I'm running a backfill Flink stream job over > older data. It has multiple interval joins. > I noticed my checkpoint is regularly gaining > in size. I'd expect my checkpoints to > stabilize and not grow. > > Is there a setting to prune useless data from > the checkpoint? My top guess is that my > checkpoint has a bunch of useless state in it. > > - Dan >
OpenPGP_signature
Description: OpenPGP digital signature