Hi there, Currently, I'm trying to write a SQL query which shall executed a time windowed/bounded JOIN on two data streams.
Suppose I have stream1 with attribute id, ts, user and stream2 with attribute id, ts, userName. I want to receive the natural JOIN of both streams with events of the same day. In Oracle (With a ts column as number instead of Timestamp, for historical reasons), I do the following: SELECT * FROM STREAM1 JOIN STREAM2 ON STREAM1. "user" = STREAM2. "userName" AND TRUNC ( TO_DATE ( '19700101' , 'YYYYMMDD' ) + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM1. "ts" ) = TRUNC ( TO_DATE ( '19700101' , 'YYYYMMDD' ) + ( 1 / 24 / 60 / 60 / 1000 ) * STREAM2. "ts" ); which yields 294 rows with my test data (14 elements from stream1 match to 21 elements in stream2 on the one day of test data). Now I want to query the same in Flink. So I registered both streams as table and properly registered the even-time (by specifying ts.rowtime as table column). My goal is to produce a time-windowed JOIN so that, if both streams advance their watermark far enough, an element is written out into an append only stream. First try (to conform time-bounded-JOIN conditions): SELECT s1.id, s2.id FROM STREAM1 AS s1 JOIN STREAM2 AS s2 ON s1.`user` = s2.userName AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR AND TUMBLE_START(s1.ts, INTERVAL '1' DAY ) = TUMBLE_START(s2.ts, INTERVAL '1' DAY ) -- Reduce to matchings on the same day. This yielded in the exception "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.". So I'm still in the area of regular joins, not time-windowed JOINs, even though I made the explicit BETWEEN for both input streams! Then I found [1], which really is my query but without the last condition (reduce to matching on the same day). I tried this one as well, just to have a starting point, but the error is the same. I then reduced the Condition to just one time bound: SELECT s1.id, s2.id FROM STREAM1 AS s1 JOIN STREAM2 AS s2 ON s1.`user` = s2.userName AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR which runs as a query but doesn't produce any results. Most likely because Flink still thinks of a regular join instead of a time-window JOIN and doesn't emit any resutls. (FYI interest, after executing the query, I convert the Table back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests). My questions are now: 1. How do I see if Flink treats my table result as a regular JOIN result or a time-bounded JOIN? 2. What is the proper way to formulate my initial query, finding all matching events within the same tumbling window? Best regards Theo Diefenthal [1] [ https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183 | https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183 ] Slide 18