Glad to hear that you resolved the issue. Aneesha Kaushal <aneesha.kaus...@reflektion.com> 于2021年3月19日周五 下午10:49写道:
> Hello Benchao, > > Thanks for clarifying! > The issue was I send very few records so I could not test how the > watermark is progressing. Today after trying on continuous stream I was > able to get the results. > > > On Fri, Mar 19, 2021 at 5:24 PM Benchao Li <libenc...@apache.org> wrote: > >> Hi Aneesha, >> >> For the interval join operator will output the data with NULL when it >> confirms that >> there will no data coming before the watermark. >> And there is an optimization for reducing state access, which may add >> more time >> to trigger the output of these data. >> For your case, it's almost 30 + 30 = 60 min. Did you wait that long to >> check the output? >> >> Aneesha Kaushal <aneesha.kaus...@reflektion.com> 于2021年3月18日周四 下午11:29写道: >> >>> Hi, >>> >>> I am doing a simple POC using Flink SQL and I am facing some issues with >>> Interval Join. >>> >>> *Use Case*: I have two Kafka streams and using Flink SQL interval join >>> I want to remove rows from* stream 1*(abandoned_user_visits) that are >>> present in *stream 2*(orders) within some time interval. >>> >>> *Data:* >>> 1) *Abandoned user visits.* Sample data: >>> {"key1": "123", "email": "ema...@example.com", "abandoned_pids": >>> [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"} >>> {"key1": "234", "email": "ema...@example.com", "abandoned_pids": >>> [1942367711], "ts": "2021-03-18 11:45:00.208"} >>> {"key1": "123", "email": "ema...@example.com", "abandoned_pids": >>> [1754171520], "ts": "2021-03-18 12:00:00.208"} >>> {"key1": "234", "email": "ema...@example.com", "abandoned_pids": >>> [1942367711], "ts": "2021-03-18 12:45:00.208"} >>> >>> 2) *User order stream* >>> {"key1": "234", "email": "ema...@example.com", "pids": [1754171520], >>> "ts": "2021-03-18 11:55:00.208"} >>> {"key1": "123", "email": "ema...@example.com", "pids": [674378611, >>> 1754171520], "ts": "2021-03-18 12:10:00.208"} >>> >>> When I try to push the above records to Kafka and select from the below >>> VIEW. I get the result that is actually an *INNER* join(not OUTER >>> join). >>> I even tried posting just one record to stream(1) and no record to >>> stream (2), expecting that that record should be emitted. But nothing was >>> emitted. What was interesting is when I use the processing time instead of >>> event time, I get the results as expected. >>> >>> *Tables and Views used: * >>> CREATE TABLE abandoned_visits ( >>> key1 STRING >>> , email STRING >>> , ts TIMESTAMP(3) >>> , abandoned_pids ARRAY<BIGINT> >>> , WATERMARK FOR ts AS ts >>> ) >>> WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'abandoned-visits', >>> 'properties.bootstrap.servers' = '...', >>> 'format' = 'json' >>> ); >>> >>> CREATE TABLE orders ( >>> key1 STRING >>> , email STRING >>> , ts TIMESTAMP(3) >>> , pids ARRAY<BIGINT> >>> , WATERMARK FOR ts AS ts >>> ) >>> WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'orders', >>> 'properties.bootstrap.servers' = '...', >>> 'format' = 'json' >>> ); >>> >>> CREATE VIEW abandoned_visits_with_no_orders AS >>> SELECT >>> av.key1 >>> , av.email >>> , av.abandoned_pids >>> , FLOOR(av.ts TO MINUTE) AS visit_timestamp >>> , FLOOR(o.ts TO MINUTE) AS order_timestamp >>> , o.email AS order_email >>> FROM abandoned_visits av >>> FULL OUTER JOIN orders o >>> ON av.key1 = o.key1 >>> AND av.email = o.email >>> AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL >>> '30' MINUTE >>> -- WHERE >>> -- o.email IS NULL // Commented this >>> out so as to get something in result >>> ; >>> >>> *Result: * >>> select * from abandoned_visits_with_no_orders; >>> >>> This gives a result the same as an inner join. It doesn't have rows with >>> NULL order data. >>> I would appreciate any help. >>> >>> Thanks, >>> Aneesha >>> >>> >> >> -- >> >> Best, >> Benchao Li >> > -- Best, Benchao Li