Hey Dan,

I think the logic should be correct. Mind that in the processElement we
are using *relative*Upper/LowerBound, which are inverted global bound:

relativeUpperBound = upperBound for left and -lowerBound for right

relativeLowerBound = lowerBound for left and -upperBound for right

Therefore the cleaning logic in onTimer effectively uses the same logic.
If I understand it correctly, this trick was introduced to deduplicate
the method.

There might be a bug somewhere, but I don't think it's where you
pointed. I'd suggest to first investigate the progress of watermarks.

Best,

Dawid

On 09/03/2021 08:36, Dan Hill wrote:
> Hi Yun!
>
> That advice was useful.  The state for that operator is very small
> (31kb).  Most of the checkpoint size is in a couple simple
> DataStream.intervalJoin operators.  The time intervals are fairly short.
>
> I'm going to try running the code with some small configuration
> changes.  One thing I did notice is that I set a positive value for
> the relativeUpperBound.  I'm not sure if I found a bug in
> IntervalJoinOperator
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
>  
> The logic in IntervalJoinOperator.onEventTime needs an exact timestamp
> for clean up.  It has some logic around cleaning up the right side
> that uses timerTimestamp + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
>  
> However, processElement doesn’t use the same logic when creating a
> timer (I only see + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
>  
> Maybe I'm misreading the code.  It feels like a bug.
>
>
> On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yungao...@aliyun.com
> <mailto:yungao...@aliyun.com>> wrote:
>
>     Hi Dan,
>
>     Regarding the original checkpoint size problem, could you also
>     have a check 
>     which tasks' state are increasing from the checkpoint UI ? For
>     example, the 
>     attached operator has a `alreadyOutputed` value state, which seems
>     to keep
>     increasing if there are always new keys ?
>
>     Best,
>     Yun
>
>
>         ------------------Original Mail ------------------
>         *Sender:*Dan Hill <quietgol...@gmail.com
>         <mailto:quietgol...@gmail.com>>
>         *Send Date:*Tue Mar 9 00:59:24 2021
>         *Recipients:*Yun Gao <yungao...@aliyun.com
>         <mailto:yungao...@aliyun.com>>
>         *CC:*user <user@flink.apache.org <mailto:user@flink.apache.org>>
>         *Subject:*Re: Gradually increasing checkpoint size
>
>             Hi Yun!
>
>             Thanks for the quick reply.
>
>             One of the lowerBounds is large but the table being joined
>             with is ~500 rows.  I also have my own operator that only
>             outputs the first value.
>
>             public class OnlyFirstUser<T extends GeneratedMessageV3>
>             extends RichFlatMapFunction<T, T> {
>
>
>                 private transient ValueState<Boolean> alreadyOutputted;
>
>
>                 @Override
>
>                 public void flatMap(T value, Collector<T> out) throws
>             Exception {
>
>                     if (!alreadyOutputted.value()) {
>
>                         alreadyOutputted.update(true);
>
>                         out.collect(value);
>
>                     }
>
>                 }
>
>
>                 @Override
>
>                 public void open(Configuration config) {
>
>                     ValueStateDescriptor<Boolean> descriptor =
>
>                             new ValueStateDescriptor<>(
>
>                                     "alreadyOutputted", // the state name
>
>                                     TypeInformation.of(new
>             TypeHint<Boolean>() {}), // type information
>
>                                     false); // default value of the
>             state, if nothing was set
>
>                     alreadyOutputted =
>             getRuntimeContext().getState(descriptor);
>
>                 }
>
>             }
>
>
>             All of my inputs have this watermark strategy.  In the
>             Flink UI, early in the job run, I see "Low Watermarks" on
>             each node and they increase.  After some checkpoint
>             failures, low watermarks stop appearing in the UI
>             
> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>.
>
>
>             .assignTimestampsAndWatermarks(
>
>                
>             
> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>
>
>
>             Thanks Yun!
>
>
>             On Mon, Mar 8, 2021 at 7:27 AM Yun Gao
>             <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> wrote:
>
>                 Hi Dan,
>
>                 Have you use a too large upperBound or lowerBound?
>
>                 If not, could you also check the watermark strategy ?
>                 The interval join operator depends on the event-time
>                 timer for cleanup, and the event-time timer would be
>                 triggered via watermark. 
>
>                 Best,
>                 Yun
>
>
>                     ------------------Original Mail ------------------
>                     *Sender:*Dan Hill <quietgol...@gmail.com
>                     <mailto:quietgol...@gmail.com>>
>                     *Send Date:*Mon Mar 8 14:59:48 2021
>                     *Recipients:*user <user@flink.apache.org
>                     <mailto:user@flink.apache.org>>
>                     *Subject:*Gradually increasing checkpoint size
>
>                         Hi!
>
>                         I'm running a backfill Flink stream job over
>                         older data.  It has multiple interval joins. 
>                         I noticed my checkpoint is regularly gaining
>                         in size.  I'd expect my checkpoints to
>                         stabilize and not grow.
>
>                         Is there a setting to prune useless data from
>                         the checkpoint?  My top guess is that my
>                         checkpoint has a bunch of useless state in it.
>
>                         - Dan
>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to