I wrote a demo example for time windowed join which you can pick up [1]

[1] https://gist.github.com/docete/8e78ff8b5d0df69f60dda547780101f1

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao <doc...@gmail.com> wrote:

> You can check the plan after optimize to verify it's a regular join or
> time-bounded join(Should have a WindowJoin). The most direct way is
> breakpoint at optimizing phase [1][2].
> And you can use your TestData and create an ITCase for debugging [3]
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148
> [2]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68
> [3]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi there,
>>
>> Currently, I'm trying to write a SQL query which shall executed a time
>> windowed/bounded JOIN on two data streams.
>>
>> Suppose I have stream1 with attribute id, ts, user and stream2 with
>> attribute id, ts, userName. I want to receive the natural JOIN of both
>> streams with events of the same day.
>>
>> In Oracle (With a ts column as number instead of Timestamp, for
>> historical reasons), I do the following:
>>
>> SELECT *
>>   FROM STREAM1
>>   JOIN STREAM2 ON STREAM1."user" = STREAM2."userName"
>>                 AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / 
>> 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 
>> 24 / 60 / 60 / 1000 ) * STREAM2."ts");
>>
>> which yields 294 rows with my test data (14 elements from stream1 match
>> to 21 elements in stream2 on the one day of test data). Now I want to query
>> the same in Flink. So I registered both streams as table and properly
>> registered the even-time (by specifying ts.rowtime as table column).
>>
>> My goal is to produce a time-windowed JOIN so that, if both streams
>> advance their watermark far enough, an element is written out into an
>> append only stream.
>>
>> First try (to conform time-bounded-JOIN conditions):
>>
>> SELECT s1.id, s2.id
>>   FROM STREAM1 AS s1
>>   JOIN STREAM2 AS s2
>>     ON s1.`user` = s2.userName
>>        AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL 
>> '24' HOUR
>>        AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL 
>> '24' HOUR
>>        AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, 
>> INTERVAL'1' DAY) -- Reduce to matchings on the same day.
>>
>> This yielded in the exception "Rowtime attributes must not be in the
>> input rows of a regular join. As a workaround you can cast the time
>> attributes of input tables to TIMESTAMP before.". So I'm still in the area
>> of regular joins, not time-windowed JOINs, even though I made the explicit
>> BETWEEN for both input streams!
>>
>> Then I found [1], which really is my query but without the last condition
>> (reduce to matching on the same day). I tried this one as well, just to
>> have a starting point, but the error is the same.
>> I then reduced the Condition to just one time bound:
>>
>> SELECT s1.id, s2.id
>>   FROM STREAM1 AS s1
>>   JOIN STREAM2 AS s2
>>     ON s1.`user` = s2.userName
>>        AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL 
>> '24' HOUR
>>
>> which runs as a query but doesn't produce any results. Most likely
>> because Flink still thinks of a regular join instead of a time-window JOIN
>> and doesn't emit any resutls. (FYI interest, after executing the query, I
>> convert the Table back to a stream via tEnv.toAppendStream and I use Flink
>> 1.8.0 for tests).
>>
>> My questions are now:
>> 1. How do I see if Flink treats my table result as a regular JOIN result
>> or a time-bounded JOIN?
>> 2. What is the proper way to formulate my initial query, finding all
>> matching events within the same tumbling window?
>>
>> Best regards
>> Theo Diefenthal
>>
>> [1]
>> https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
>> Slide 18
>>
>

Reply via email to