Hi Flink users,

Last week I sent an email about some very delayed outputs as a result of
running LEFT or FULL interval joins in Flink SQL. I noticed that in a left
join, when a record arrives from the left source but there is no matching
record from the right source, the watermark for both sides needs to advance
much further than I expected before the result is emitted. Looking more
closely at the code, I noticed a formula for cleanUpTime that didn't make
sense to me (cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval
+ allowedLateness + 1).  I don't understand why there needs to be a
minCleanUpInterval at all and unnecessarily delaying outputs can negatively
impact downstream jobs as they now have to deal with a stream that could be
very out of order. More details in the original email. I wanted to raise
this issue again this week to see if anyone has any context on interval
joins and can explain cleanUpTime and minCleanUpInterval.

Thanks,
Charles

On Tue, Mar 14, 2023 at 1:44 PM Charles Tan <ctangu...@gmail.com> wrote:

> Hi everyone,
>
> I have been playing around with Flink SQL’s interval joins and noticed
> that some outputs from unmatched LEFT or FULL joins are arriving much later
> than I expected. Take the following query for example:
> SELECT * FROM orders o LEFT JOIN shipments s
> ON (o.orderID = s.orderID) AND o.rowtime BETWEEN s.rowtime - INTERVAL '1'
> HOUR AND s.rowtime + INTERVAL '1' HOUR
>
> I expect any unmatched records from orders to be output once the watermark
> for both orders and shipments advances 1+ hours past the order record’s
> rowtime. However, I’m noticing that the watermarks actually need to advance
> by 2+ hours. When I looked into this further, I found these formulas in the
> TimeIntervalJoin class [1]:
> minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2
> long cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval +
> allowedLateness + 1;
>
> In this case, the cleanUpTime equals rowtime + 1h + 1h + 0 + 1, which
> matches the 2+ hours I was observing. From the commit history and
> documentation, I could not understand why the cleanUpTime is calculated
> this way. Why does the minCleanUpInterval exist and why is its value an
> average between the left and right relative sizes? I found a similar JIRA
> issue opened a few years ago, FLINK-18996 [2]. This issue can negatively
> affect the performance of downstream jobs that ingest from this output
> because delaying outputs of the interval join can lead to the output stream
> being very out of order. A downstream Flink job for example would have to
> adjust the allowed latency, or bounded out of orderness, to accommodate for
> this delay.
>
> Thanks,
> Charles
>
> [1]
> https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java#L366
> [2] https://issues.apache.org/jira/browse/FLINK-18996
>

Reply via email to