Hello Benchao, Thanks for clarifying! The issue was I send very few records so I could not test how the watermark is progressing. Today after trying on continuous stream I was able to get the results.
On Fri, Mar 19, 2021 at 5:24 PM Benchao Li <libenc...@apache.org> wrote: > Hi Aneesha, > > For the interval join operator will output the data with NULL when it > confirms that > there will no data coming before the watermark. > And there is an optimization for reducing state access, which may add more > time > to trigger the output of these data. > For your case, it's almost 30 + 30 = 60 min. Did you wait that long to > check the output? > > Aneesha Kaushal <aneesha.kaus...@reflektion.com> 于2021年3月18日周四 下午11:29写道: > >> Hi, >> >> I am doing a simple POC using Flink SQL and I am facing some issues with >> Interval Join. >> >> *Use Case*: I have two Kafka streams and using Flink SQL interval join I >> want to remove rows from* stream 1*(abandoned_user_visits) that are >> present in *stream 2*(orders) within some time interval. >> >> *Data:* >> 1) *Abandoned user visits.* Sample data: >> {"key1": "123", "email": "ema...@example.com", "abandoned_pids": >> [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"} >> {"key1": "234", "email": "ema...@example.com", "abandoned_pids": >> [1942367711], "ts": "2021-03-18 11:45:00.208"} >> {"key1": "123", "email": "ema...@example.com", "abandoned_pids": >> [1754171520], "ts": "2021-03-18 12:00:00.208"} >> {"key1": "234", "email": "ema...@example.com", "abandoned_pids": >> [1942367711], "ts": "2021-03-18 12:45:00.208"} >> >> 2) *User order stream* >> {"key1": "234", "email": "ema...@example.com", "pids": [1754171520], >> "ts": "2021-03-18 11:55:00.208"} >> {"key1": "123", "email": "ema...@example.com", "pids": [674378611, >> 1754171520], "ts": "2021-03-18 12:10:00.208"} >> >> When I try to push the above records to Kafka and select from the below >> VIEW. I get the result that is actually an *INNER* join(not OUTER join). >> I even tried posting just one record to stream(1) and no record to stream >> (2), expecting that that record should be emitted. But nothing was emitted. >> What was interesting is when I use the processing time instead of event >> time, I get the results as expected. >> >> *Tables and Views used: * >> CREATE TABLE abandoned_visits ( >> key1 STRING >> , email STRING >> , ts TIMESTAMP(3) >> , abandoned_pids ARRAY<BIGINT> >> , WATERMARK FOR ts AS ts >> ) >> WITH ( >> 'connector' = 'kafka', >> 'topic' = 'abandoned-visits', >> 'properties.bootstrap.servers' = '...', >> 'format' = 'json' >> ); >> >> CREATE TABLE orders ( >> key1 STRING >> , email STRING >> , ts TIMESTAMP(3) >> , pids ARRAY<BIGINT> >> , WATERMARK FOR ts AS ts >> ) >> WITH ( >> 'connector' = 'kafka', >> 'topic' = 'orders', >> 'properties.bootstrap.servers' = '...', >> 'format' = 'json' >> ); >> >> CREATE VIEW abandoned_visits_with_no_orders AS >> SELECT >> av.key1 >> , av.email >> , av.abandoned_pids >> , FLOOR(av.ts TO MINUTE) AS visit_timestamp >> , FLOOR(o.ts TO MINUTE) AS order_timestamp >> , o.email AS order_email >> FROM abandoned_visits av >> FULL OUTER JOIN orders o >> ON av.key1 = o.key1 >> AND av.email = o.email >> AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30' >> MINUTE >> -- WHERE >> -- o.email IS NULL // Commented this >> out so as to get something in result >> ; >> >> *Result: * >> select * from abandoned_visits_with_no_orders; >> >> This gives a result the same as an inner join. It doesn't have rows with >> NULL order data. >> I would appreciate any help. >> >> Thanks, >> Aneesha >> >> > > -- > > Best, > Benchao Li >