Hello!

Thank you for looking into these issues! I'm happy that you identified the
root cause for OuterJoinTest and SqlSyntaxTest and working on the fix.

Regarding IntervalJoinTest, I think I understand your point. Thank you for
explaining that. However, this can be confusing to a user. Let's maybe use
another example that illustrates it better (I haven't written the test yet
but I might try to do this tomorrow):

We have event A and event B. Event B can happen following event A but not
later than 120s after event A was created. So, one of the join conditions
would be eventB.created between eventA.created and eventA.created +
interval 120 seconds. Now I want to count, for each event B and for each
1-min time window, how many events of type A preceded this event B. The
thing is when I do the interval join between et-A and et-B and then group
by et-B, I get extra 2-min delay compared to what I would expect. So, for
example, to output a result for the time window ending 12:10, the latest
event time needs to cross 12:12, where at 12:10 I already know that there
won't be any additional events of type A for event B (because event A must
happen before event B).

I did some investigation in the past and it was due to interval join
"operator" not having the knowledge what will be the next operation. If I
were to group by et-A, for example, then this 2-min delay would be needed.
But not if I am to group by et-B.

It's been a while since I last worked on this problem so I need to refresh
my memory. I'll try to respond tomorrow with more context, if that's needed.

Regards,
Andrzej

wt., 1 paź 2024 o 05:40 Jungtaek Lim <kabhwan.opensou...@gmail.com>
napisał(a):

> I figured out the issue which breaks the second test in SqlSyntaxTest.
> This is also a correctness issue, unfortunately.
>
> Issue and the fix for OuterJoinTest:
> https://issues.apache.org/jira/browse/SPARK-49829
> Issue and the fix for SqlSyntaxTest:
> https://issues.apache.org/jira/browse/SPARK-49836
>
> Thanks again for reporting. I wish I hadn't missed this in Feb...
>
>
> On Mon, Sep 30, 2024 at 7:13 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
> wrote:
>
>> I just quickly looked into SqlSyntaxTest - the first broken test looks to
>> be fixed via SPARK-46062
>> <https://issues.apache.org/jira/browse/SPARK-46062> which was released
>> in Spark 3.5.1. The second broken test is a valid issue and I'm yet to know
>> why this is happening. I'll file a JIRA ticket and let me (or folks in my
>> team) try to look into it. I'd be happy if there is a volunteer looking
>> into this issue.
>>
>> On Sun, Sep 29, 2024 at 10:15 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Sorry I totally missed this email. This is forgotten for 6 months but
>>> I'm happy that we have smart users reporting such complex edge-case issues!
>>>
>>> I haven't had time to validate all of them but OuterJoinTest is a valid
>>> correctness issue indeed. Thanks for reporting to us! I figured out the
>>> root cause and have a fix now. I will submit a fix soon.
>>>
>>> I also quickly looked into IntervalJoinTest but it looks like due to how
>>> SS works.
>>>
>>> In the second time interval join, you may expect that lower bound of et1
>>> = et3 - 5mins, and WM for et3 isn't delayed by the first time interval
>>> join, hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3
>>> - 5mins).
>>>
>>> But in SS, we have simplified the watermark model - input watermark is
>>> calculated per "operator" level. (Also we still calculate global watermark
>>> among watermark definition"s" and apply the same value to all
>>> watermark definition"s.). So, in the second time interval join, WM for et3
>>> is also considered as delayed by the first time interval join as input
>>> watermark is "min" of all output watermarks from upstream, though it's not
>>> participated in the first time interval join. That said, lower bound of et1
>>> = et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins
>>> ~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the
>>> watermark to window.end + 5 mins does not produce the output and fails the
>>> test.
>>>
>>> Please let me know if this does not make sense to you and we can discuss
>>> more.
>>>
>>> I haven't had time to look into SqlSyntaxTest - we don't have enough
>>> tests on interop between DataFrame <-> SQL for streaming query, so we might
>>> have a non-trivial number of unknowns. I (or folks in my team) will take a
>>> look sooner than later.
>>>
>>> Thanks again for the valuable report!
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>>
>>>
>>> On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera <andrzejz...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Do you think there is any chance for this issue to get resolved? Should
>>>> I create another bug report? As mentioned in my message, there is one open
>>>> already: https://issues.apache.org/jira/browse/SPARK-45637 but it
>>>> covers only one of the problems.
>>>>
>>>> Andrzej
>>>>
>>>> wt., 27 lut 2024 o 09:58 Andrzej Zera <andrzejz...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> Hi,
>>>>>
>>>>> Yes, I tested all of them on spark 3.5.
>>>>>
>>>>> Regards,
>>>>> Andrzej
>>>>>
>>>>>
>>>>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh <mich.talebza...@gmail.com>
>>>>> napisał(a):
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> These are all on spark 3.5, correct?
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>>> expert opinions (Werner
>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>>
>>>>>>
>>>>>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera <andrzejz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey all,
>>>>>>>
>>>>>>> I've been using Structured Streaming in production for almost a year
>>>>>>> already and I want to share the bugs I found in this time. I created a 
>>>>>>> test
>>>>>>> for each of the issues and put them all here:
>>>>>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>>>>>>
>>>>>>> I split the issues into three groups: outer joins on event time,
>>>>>>> interval joins and Spark SQL.
>>>>>>>
>>>>>>> Issues related to outer joins:
>>>>>>>
>>>>>>>    - When joining three or more input streams on event time, if two
>>>>>>>    or more streams don't contain an event for a join key (which is event
>>>>>>>    time), no row will be output even if other streams contain an event 
>>>>>>> for
>>>>>>>    this join key. Tests that check for this:
>>>>>>>    
>>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>>>>>>    and
>>>>>>>    
>>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>>>>>>    - When joining aggregated stream with raw events with a stream
>>>>>>>    with already aggregated events (aggregation made outside of Spark), 
>>>>>>> then no
>>>>>>>    row will be output if that second stream don't contain a 
>>>>>>> corresponding
>>>>>>>    event. Test that checks for this:
>>>>>>>    
>>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>>>>>>    - When joining two aggregated streams (aggregated in Spark), no
>>>>>>>    result is produced. Test that checks for this:
>>>>>>>    
>>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>>>>>>    I've already reported this one here:
>>>>>>>    https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't
>>>>>>>    been handled yet.
>>>>>>>
>>>>>>> Issues related to interval joins:
>>>>>>>
>>>>>>>    - When joining three streams (A, B, C) using interval join on
>>>>>>>    event time, in the way that B.eventTime is conditioned on 
>>>>>>> A.eventTime and
>>>>>>>    C.eventTime is also conditioned on A.eventTime, and then doing window
>>>>>>>    aggregation based on A's event time, the result is output only after
>>>>>>>    watermark crosses the window end + interval(A, B) + interval (A, C).
>>>>>>>    However, I'd expect results to be output faster, i.e. when the 
>>>>>>> watermark
>>>>>>>    crosses window end + MAX(interval(A, B) + interval (A, C)). If our 
>>>>>>> case is
>>>>>>>    that event B can happen 3 minutes after event A and event C can 
>>>>>>> happen 5
>>>>>>>    minutes after A, there is no point to suspend reporting output for 8
>>>>>>>    minutes (3+5) after the end of the window if we know that no more 
>>>>>>> event can
>>>>>>>    be matched after 5 min from the window end (assuming window end is 
>>>>>>> based on
>>>>>>>    A's event time). Test that checks for this:
>>>>>>>    
>>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>>>>>>
>>>>>>> SQL issues:
>>>>>>>
>>>>>>>    - WITH clause (in contrast to subquery) seems to create a static
>>>>>>>    DataFrame that can't be used in streaming joins. Test that checks 
>>>>>>> for this:
>>>>>>>    
>>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>>>>>>>    - Two subqueries, each aggregating data using window() functio,
>>>>>>>    breaks the output schema. Test that checks for this:
>>>>>>>    
>>>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>>>>>>>
>>>>>>> I'm a beginner with Scala (I'm using Structured Streaming with
>>>>>>> PySpark) so won't be able to provide fixes. But I hope the test cases I
>>>>>>> provided can be of some help.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Andrzej
>>>>>>>
>>>>>>

Reply via email to