Hello! Thank you for looking into these issues! I'm happy that you identified the root cause for OuterJoinTest and SqlSyntaxTest and working on the fix.
Regarding IntervalJoinTest, I think I understand your point. Thank you for explaining that. However, this can be confusing to a user. Let's maybe use another example that illustrates it better (I haven't written the test yet but I might try to do this tomorrow): We have event A and event B. Event B can happen following event A but not later than 120s after event A was created. So, one of the join conditions would be eventB.created between eventA.created and eventA.created + interval 120 seconds. Now I want to count, for each event B and for each 1-min time window, how many events of type A preceded this event B. The thing is when I do the interval join between et-A and et-B and then group by et-B, I get extra 2-min delay compared to what I would expect. So, for example, to output a result for the time window ending 12:10, the latest event time needs to cross 12:12, where at 12:10 I already know that there won't be any additional events of type A for event B (because event A must happen before event B). I did some investigation in the past and it was due to interval join "operator" not having the knowledge what will be the next operation. If I were to group by et-A, for example, then this 2-min delay would be needed. But not if I am to group by et-B. It's been a while since I last worked on this problem so I need to refresh my memory. I'll try to respond tomorrow with more context, if that's needed. Regards, Andrzej wt., 1 paź 2024 o 05:40 Jungtaek Lim <kabhwan.opensou...@gmail.com> napisał(a): > I figured out the issue which breaks the second test in SqlSyntaxTest. > This is also a correctness issue, unfortunately. > > Issue and the fix for OuterJoinTest: > https://issues.apache.org/jira/browse/SPARK-49829 > Issue and the fix for SqlSyntaxTest: > https://issues.apache.org/jira/browse/SPARK-49836 > > Thanks again for reporting. I wish I hadn't missed this in Feb... > > > On Mon, Sep 30, 2024 at 7:13 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> > wrote: > >> I just quickly looked into SqlSyntaxTest - the first broken test looks to >> be fixed via SPARK-46062 >> <https://issues.apache.org/jira/browse/SPARK-46062> which was released >> in Spark 3.5.1. The second broken test is a valid issue and I'm yet to know >> why this is happening. I'll file a JIRA ticket and let me (or folks in my >> team) try to look into it. I'd be happy if there is a volunteer looking >> into this issue. >> >> On Sun, Sep 29, 2024 at 10:15 AM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> Sorry I totally missed this email. This is forgotten for 6 months but >>> I'm happy that we have smart users reporting such complex edge-case issues! >>> >>> I haven't had time to validate all of them but OuterJoinTest is a valid >>> correctness issue indeed. Thanks for reporting to us! I figured out the >>> root cause and have a fix now. I will submit a fix soon. >>> >>> I also quickly looked into IntervalJoinTest but it looks like due to how >>> SS works. >>> >>> In the second time interval join, you may expect that lower bound of et1 >>> = et3 - 5mins, and WM for et3 isn't delayed by the first time interval >>> join, hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3 >>> - 5mins). >>> >>> But in SS, we have simplified the watermark model - input watermark is >>> calculated per "operator" level. (Also we still calculate global watermark >>> among watermark definition"s" and apply the same value to all >>> watermark definition"s.). So, in the second time interval join, WM for et3 >>> is also considered as delayed by the first time interval join as input >>> watermark is "min" of all output watermarks from upstream, though it's not >>> participated in the first time interval join. That said, lower bound of et1 >>> = et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins >>> ~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the >>> watermark to window.end + 5 mins does not produce the output and fails the >>> test. >>> >>> Please let me know if this does not make sense to you and we can discuss >>> more. >>> >>> I haven't had time to look into SqlSyntaxTest - we don't have enough >>> tests on interop between DataFrame <-> SQL for streaming query, so we might >>> have a non-trivial number of unknowns. I (or folks in my team) will take a >>> look sooner than later. >>> >>> Thanks again for the valuable report! >>> >>> Thanks, >>> Jungtaek Lim (HeartSaVioR) >>> >>> >>> >>> On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera <andrzejz...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Do you think there is any chance for this issue to get resolved? Should >>>> I create another bug report? As mentioned in my message, there is one open >>>> already: https://issues.apache.org/jira/browse/SPARK-45637 but it >>>> covers only one of the problems. >>>> >>>> Andrzej >>>> >>>> wt., 27 lut 2024 o 09:58 Andrzej Zera <andrzejz...@gmail.com> >>>> napisał(a): >>>> >>>>> Hi, >>>>> >>>>> Yes, I tested all of them on spark 3.5. >>>>> >>>>> Regards, >>>>> Andrzej >>>>> >>>>> >>>>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh <mich.talebza...@gmail.com> >>>>> napisał(a): >>>>> >>>>>> Hi, >>>>>> >>>>>> These are all on spark 3.5, correct? >>>>>> >>>>>> Mich Talebzadeh, >>>>>> Dad | Technologist | Solutions Architect | Engineer >>>>>> London >>>>>> United Kingdom >>>>>> >>>>>> >>>>>> view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* The information provided is correct to the best of my >>>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>>> expert opinions (Werner >>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>>>> >>>>>> >>>>>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera <andrzejz...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey all, >>>>>>> >>>>>>> I've been using Structured Streaming in production for almost a year >>>>>>> already and I want to share the bugs I found in this time. I created a >>>>>>> test >>>>>>> for each of the issues and put them all here: >>>>>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala >>>>>>> >>>>>>> I split the issues into three groups: outer joins on event time, >>>>>>> interval joins and Spark SQL. >>>>>>> >>>>>>> Issues related to outer joins: >>>>>>> >>>>>>> - When joining three or more input streams on event time, if two >>>>>>> or more streams don't contain an event for a join key (which is event >>>>>>> time), no row will be output even if other streams contain an event >>>>>>> for >>>>>>> this join key. Tests that check for this: >>>>>>> >>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86 >>>>>>> and >>>>>>> >>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169 >>>>>>> - When joining aggregated stream with raw events with a stream >>>>>>> with already aggregated events (aggregation made outside of Spark), >>>>>>> then no >>>>>>> row will be output if that second stream don't contain a >>>>>>> corresponding >>>>>>> event. Test that checks for this: >>>>>>> >>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266 >>>>>>> - When joining two aggregated streams (aggregated in Spark), no >>>>>>> result is produced. Test that checks for this: >>>>>>> >>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341. >>>>>>> I've already reported this one here: >>>>>>> https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't >>>>>>> been handled yet. >>>>>>> >>>>>>> Issues related to interval joins: >>>>>>> >>>>>>> - When joining three streams (A, B, C) using interval join on >>>>>>> event time, in the way that B.eventTime is conditioned on >>>>>>> A.eventTime and >>>>>>> C.eventTime is also conditioned on A.eventTime, and then doing window >>>>>>> aggregation based on A's event time, the result is output only after >>>>>>> watermark crosses the window end + interval(A, B) + interval (A, C). >>>>>>> However, I'd expect results to be output faster, i.e. when the >>>>>>> watermark >>>>>>> crosses window end + MAX(interval(A, B) + interval (A, C)). If our >>>>>>> case is >>>>>>> that event B can happen 3 minutes after event A and event C can >>>>>>> happen 5 >>>>>>> minutes after A, there is no point to suspend reporting output for 8 >>>>>>> minutes (3+5) after the end of the window if we know that no more >>>>>>> event can >>>>>>> be matched after 5 min from the window end (assuming window end is >>>>>>> based on >>>>>>> A's event time). Test that checks for this: >>>>>>> >>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32 >>>>>>> >>>>>>> SQL issues: >>>>>>> >>>>>>> - WITH clause (in contrast to subquery) seems to create a static >>>>>>> DataFrame that can't be used in streaming joins. Test that checks >>>>>>> for this: >>>>>>> >>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31 >>>>>>> - Two subqueries, each aggregating data using window() functio, >>>>>>> breaks the output schema. Test that checks for this: >>>>>>> >>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122 >>>>>>> >>>>>>> I'm a beginner with Scala (I'm using Structured Streaming with >>>>>>> PySpark) so won't be able to provide fixes. But I hope the test cases I >>>>>>> provided can be of some help. >>>>>>> >>>>>>> Regards, >>>>>>> Andrzej >>>>>>> >>>>>>