I just quickly looked into SqlSyntaxTest - the first broken test looks to
be fixed via SPARK-46062 <https://issues.apache.org/jira/browse/SPARK-46062>
which was released in Spark 3.5.1. The second broken test is a valid issue
and I'm yet to know why this is happening. I'll file a JIRA ticket and let
me (or folks in my team) try to look into it. I'd be happy if there is a
volunteer looking into this issue.

On Sun, Sep 29, 2024 at 10:15 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> Sorry I totally missed this email. This is forgotten for 6 months but I'm
> happy that we have smart users reporting such complex edge-case issues!
>
> I haven't had time to validate all of them but OuterJoinTest is a valid
> correctness issue indeed. Thanks for reporting to us! I figured out the
> root cause and have a fix now. I will submit a fix soon.
>
> I also quickly looked into IntervalJoinTest but it looks like due to how
> SS works.
>
> In the second time interval join, you may expect that lower bound of et1 =
> et3 - 5mins, and WM for et3 isn't delayed by the first time interval join,
> hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3 -
> 5mins).
>
> But in SS, we have simplified the watermark model - input watermark is
> calculated per "operator" level. (Also we still calculate global watermark
> among watermark definition"s" and apply the same value to all
> watermark definition"s.). So, in the second time interval join, WM for et3
> is also considered as delayed by the first time interval join as input
> watermark is "min" of all output watermarks from upstream, though it's not
> participated in the first time interval join. That said, lower bound of et1
> = et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins
> ~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the
> watermark to window.end + 5 mins does not produce the output and fails the
> test.
>
> Please let me know if this does not make sense to you and we can discuss
> more.
>
> I haven't had time to look into SqlSyntaxTest - we don't have enough tests
> on interop between DataFrame <-> SQL for streaming query, so we might have
> a non-trivial number of unknowns. I (or folks in my team) will take a look
> sooner than later.
>
> Thanks again for the valuable report!
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
>
> On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera <andrzejz...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Do you think there is any chance for this issue to get resolved? Should I
>> create another bug report? As mentioned in my message, there is one open
>> already: https://issues.apache.org/jira/browse/SPARK-45637 but it covers
>> only one of the problems.
>>
>> Andrzej
>>
>> wt., 27 lut 2024 o 09:58 Andrzej Zera <andrzejz...@gmail.com> napisał(a):
>>
>>> Hi,
>>>
>>> Yes, I tested all of them on spark 3.5.
>>>
>>> Regards,
>>> Andrzej
>>>
>>>
>>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh <mich.talebza...@gmail.com>
>>> napisał(a):
>>>
>>>> Hi,
>>>>
>>>> These are all on spark 3.5, correct?
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera <andrzejz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> I've been using Structured Streaming in production for almost a year
>>>>> already and I want to share the bugs I found in this time. I created a 
>>>>> test
>>>>> for each of the issues and put them all here:
>>>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>>>>
>>>>> I split the issues into three groups: outer joins on event time,
>>>>> interval joins and Spark SQL.
>>>>>
>>>>> Issues related to outer joins:
>>>>>
>>>>>    - When joining three or more input streams on event time, if two
>>>>>    or more streams don't contain an event for a join key (which is event
>>>>>    time), no row will be output even if other streams contain an event for
>>>>>    this join key. Tests that check for this:
>>>>>    
>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>>>>    and
>>>>>    
>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>>>>    - When joining aggregated stream with raw events with a stream
>>>>>    with already aggregated events (aggregation made outside of Spark), 
>>>>> then no
>>>>>    row will be output if that second stream don't contain a corresponding
>>>>>    event. Test that checks for this:
>>>>>    
>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>>>>    - When joining two aggregated streams (aggregated in Spark), no
>>>>>    result is produced. Test that checks for this:
>>>>>    
>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>>>>    I've already reported this one here:
>>>>>    https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't
>>>>>    been handled yet.
>>>>>
>>>>> Issues related to interval joins:
>>>>>
>>>>>    - When joining three streams (A, B, C) using interval join on
>>>>>    event time, in the way that B.eventTime is conditioned on A.eventTime 
>>>>> and
>>>>>    C.eventTime is also conditioned on A.eventTime, and then doing window
>>>>>    aggregation based on A's event time, the result is output only after
>>>>>    watermark crosses the window end + interval(A, B) + interval (A, C).
>>>>>    However, I'd expect results to be output faster, i.e. when the 
>>>>> watermark
>>>>>    crosses window end + MAX(interval(A, B) + interval (A, C)). If our 
>>>>> case is
>>>>>    that event B can happen 3 minutes after event A and event C can happen 
>>>>> 5
>>>>>    minutes after A, there is no point to suspend reporting output for 8
>>>>>    minutes (3+5) after the end of the window if we know that no more 
>>>>> event can
>>>>>    be matched after 5 min from the window end (assuming window end is 
>>>>> based on
>>>>>    A's event time). Test that checks for this:
>>>>>    
>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>>>>
>>>>> SQL issues:
>>>>>
>>>>>    - WITH clause (in contrast to subquery) seems to create a static
>>>>>    DataFrame that can't be used in streaming joins. Test that checks for 
>>>>> this:
>>>>>    
>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>>>>>    - Two subqueries, each aggregating data using window() functio,
>>>>>    breaks the output schema. Test that checks for this:
>>>>>    
>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>>>>>
>>>>> I'm a beginner with Scala (I'm using Structured Streaming with
>>>>> PySpark) so won't be able to provide fixes. But I hope the test cases I
>>>>> provided can be of some help.
>>>>>
>>>>> Regards,
>>>>> Andrzej
>>>>>
>>>>

Reply via email to