Hi Flink users, Last week I sent an email about some very delayed outputs as a result of running LEFT or FULL interval joins in Flink SQL. I noticed that in a left join, when a record arrives from the left source but there is no matching record from the right source, the watermark for both sides needs to advance much further than I expected before the result is emitted. Looking more closely at the code, I noticed a formula for cleanUpTime that didn't make sense to me (cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1). I don't understand why there needs to be a minCleanUpInterval at all and unnecessarily delaying outputs can negatively impact downstream jobs as they now have to deal with a stream that could be very out of order. More details in the original email. I wanted to raise this issue again this week to see if anyone has any context on interval joins and can explain cleanUpTime and minCleanUpInterval.
Thanks, Charles On Tue, Mar 14, 2023 at 1:44 PM Charles Tan <ctangu...@gmail.com> wrote: > Hi everyone, > > I have been playing around with Flink SQL’s interval joins and noticed > that some outputs from unmatched LEFT or FULL joins are arriving much later > than I expected. Take the following query for example: > SELECT * FROM orders o LEFT JOIN shipments s > ON (o.orderID = s.orderID) AND o.rowtime BETWEEN s.rowtime - INTERVAL '1' > HOUR AND s.rowtime + INTERVAL '1' HOUR > > I expect any unmatched records from orders to be output once the watermark > for both orders and shipments advances 1+ hours past the order record’s > rowtime. However, I’m noticing that the watermarks actually need to advance > by 2+ hours. When I looked into this further, I found these formulas in the > TimeIntervalJoin class [1]: > minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2 > long cleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval + > allowedLateness + 1; > > In this case, the cleanUpTime equals rowtime + 1h + 1h + 0 + 1, which > matches the 2+ hours I was observing. From the commit history and > documentation, I could not understand why the cleanUpTime is calculated > this way. Why does the minCleanUpInterval exist and why is its value an > average between the left and right relative sizes? I found a similar JIRA > issue opened a few years ago, FLINK-18996 [2]. This issue can negatively > affect the performance of downstream jobs that ingest from this output > because delaying outputs of the interval join can lead to the output stream > being very out of order. A downstream Flink job for example would have to > adjust the allowed latency, or bounded out of orderness, to accommodate for > this delay. > > Thanks, > Charles > > [1] > https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java#L366 > [2] https://issues.apache.org/jira/browse/FLINK-18996 >