I wrote a demo example for time windowed join which you can pick up [1] [1] https://gist.github.com/docete/8e78ff8b5d0df69f60dda547780101f1
*Best Regards,* *Zhenghua Gao* On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao <doc...@gmail.com> wrote: > You can check the plan after optimize to verify it's a regular join or > time-bounded join(Should have a WindowJoin). The most direct way is > breakpoint at optimizing phase [1][2]. > And you can use your TestData and create an ITCase for debugging [3] > > > [1] > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148 > [2] > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68 > [3] > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala > > *Best Regards,* > *Zhenghua Gao* > > > On Mon, Aug 12, 2019 at 10:49 PM Theo Diefenthal < > theo.diefent...@scoop-software.de> wrote: > >> Hi there, >> >> Currently, I'm trying to write a SQL query which shall executed a time >> windowed/bounded JOIN on two data streams. >> >> Suppose I have stream1 with attribute id, ts, user and stream2 with >> attribute id, ts, userName. I want to receive the natural JOIN of both >> streams with events of the same day. >> >> In Oracle (With a ts column as number instead of Timestamp, for >> historical reasons), I do the following: >> >> SELECT * >> FROM STREAM1 >> JOIN STREAM2 ON STREAM1."user" = STREAM2."userName" >> AND TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / 24 / 60 / >> 60 / 1000 ) * STREAM1."ts") = TRUNC(TO_DATE('19700101', 'YYYYMMDD') + ( 1 / >> 24 / 60 / 60 / 1000 ) * STREAM2."ts"); >> >> which yields 294 rows with my test data (14 elements from stream1 match >> to 21 elements in stream2 on the one day of test data). Now I want to query >> the same in Flink. So I registered both streams as table and properly >> registered the even-time (by specifying ts.rowtime as table column). >> >> My goal is to produce a time-windowed JOIN so that, if both streams >> advance their watermark far enough, an element is written out into an >> append only stream. >> >> First try (to conform time-bounded-JOIN conditions): >> >> SELECT s1.id, s2.id >> FROM STREAM1 AS s1 >> JOIN STREAM2 AS s2 >> ON s1.`user` = s2.userName >> AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL >> '24' HOUR >> AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL >> '24' HOUR >> AND TUMBLE_START(s1.ts, INTERVAL '1' DAY) = TUMBLE_START(s2.ts, >> INTERVAL'1' DAY) -- Reduce to matchings on the same day. >> >> This yielded in the exception "Rowtime attributes must not be in the >> input rows of a regular join. As a workaround you can cast the time >> attributes of input tables to TIMESTAMP before.". So I'm still in the area >> of regular joins, not time-windowed JOINs, even though I made the explicit >> BETWEEN for both input streams! >> >> Then I found [1], which really is my query but without the last condition >> (reduce to matching on the same day). I tried this one as well, just to >> have a starting point, but the error is the same. >> I then reduced the Condition to just one time bound: >> >> SELECT s1.id, s2.id >> FROM STREAM1 AS s1 >> JOIN STREAM2 AS s2 >> ON s1.`user` = s2.userName >> AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL >> '24' HOUR >> >> which runs as a query but doesn't produce any results. Most likely >> because Flink still thinks of a regular join instead of a time-window JOIN >> and doesn't emit any resutls. (FYI interest, after executing the query, I >> convert the Table back to a stream via tEnv.toAppendStream and I use Flink >> 1.8.0 for tests). >> >> My questions are now: >> 1. How do I see if Flink treats my table result as a regular JOIN result >> or a time-bounded JOIN? >> 2. What is the proper way to formulate my initial query, finding all >> matching events within the same tumbling window? >> >> Best regards >> Theo Diefenthal >> >> [1] >> https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183 >> Slide 18 >> >