Hi there, 

Currently, I'm trying to write a SQL query which shall executed a time 
windowed/bounded JOIN on two data streams. 

Suppose I have stream1 with attribute id, ts, user and stream2 with attribute 
id, ts, userName. I want to receive the natural JOIN of both streams with 
events of the same day. 

In Oracle (With a ts column as number instead of Timestamp, for historical 
reasons), I do the following: 

SELECT * 
FROM STREAM1 
JOIN STREAM2 ON STREAM1. "user" = STREAM2. "userName" 
AND TRUNC ( TO_DATE ( '19700101' , 'YYYYMMDD' ) + ( 1 / 24 / 60 / 60 / 1000 ) * 
STREAM1. "ts" ) = TRUNC ( TO_DATE ( '19700101' , 'YYYYMMDD' ) + ( 1 / 24 / 60 / 
60 / 1000 ) * STREAM2. "ts" ); 
which yields 294 rows with my test data (14 elements from stream1 match to 21 
elements in stream2 on the one day of test data). Now I want to query the same 
in Flink. So I registered both streams as table and properly registered the 
even-time (by specifying ts.rowtime as table column). 

My goal is to produce a time-windowed JOIN so that, if both streams advance 
their watermark far enough, an element is written out into an append only 
stream. 

First try (to conform time-bounded-JOIN conditions): 
SELECT s1.id, s2.id 
FROM STREAM1 AS s1 
JOIN STREAM2 AS s2 
ON s1.`user` = s2.userName 
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR 
AND s2.ts BETWEEN s1.ts - INTERVAL '24' HOUR AND s1.ts + INTERVAL '24' HOUR 
AND TUMBLE_START(s1.ts, INTERVAL '1' DAY ) = TUMBLE_START(s2.ts, INTERVAL '1' 
DAY ) -- Reduce to matchings on the same day. 
This yielded in the exception "Rowtime attributes must not be in the input rows 
of a regular join. As a workaround you can cast the time attributes of input 
tables to TIMESTAMP before.". So I'm still in the area of regular joins, not 
time-windowed JOINs, even though I made the explicit BETWEEN for both input 
streams! 

Then I found [1], which really is my query but without the last condition 
(reduce to matching on the same day). I tried this one as well, just to have a 
starting point, but the error is the same. 
I then reduced the Condition to just one time bound: 
SELECT s1.id, s2.id 
FROM STREAM1 AS s1 
JOIN STREAM2 AS s2 
ON s1.`user` = s2.userName 
AND s1.ts BETWEEN s2.ts - INTERVAL '24' HOUR AND s2.ts + INTERVAL '24' HOUR 
which runs as a query but doesn't produce any results. Most likely because 
Flink still thinks of a regular join instead of a time-window JOIN and doesn't 
emit any resutls. (FYI interest, after executing the query, I convert the Table 
back to a stream via tEnv.toAppendStream and I use Flink 1.8.0 for tests). 

My questions are now: 
1. How do I see if Flink treats my table result as a regular JOIN result or a 
time-bounded JOIN? 
2. What is the proper way to formulate my initial query, finding all matching 
events within the same tumbling window? 

Best regards 
Theo Diefenthal 

[1] [ 
https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
 | 
https://de.slideshare.net/FlinkForward/flink-forward-berlin-2018-xingcan-cui-stream-join-in-flink-from-discrete-to-continuous-115374183
 ] Slide 18 

Reply via email to