Glad to hear that you resolved the issue.

Aneesha Kaushal <aneesha.kaus...@reflektion.com> 于2021年3月19日周五 下午10:49写道:

> Hello Benchao,
>
> Thanks for clarifying!
> The issue was I send very few records so I could not test how the
> watermark is progressing. Today after trying on continuous stream I was
> able to get the results.
>
>
> On Fri, Mar 19, 2021 at 5:24 PM Benchao Li <libenc...@apache.org> wrote:
>
>> Hi Aneesha,
>>
>> For the interval join operator will output the data with NULL when it
>> confirms that
>> there will no data coming before the watermark.
>> And there is an optimization for reducing state access, which may add
>> more time
>> to trigger the output of these data.
>> For your case, it's almost 30 + 30 = 60 min. Did you wait that long to
>> check the output?
>>
>> Aneesha Kaushal <aneesha.kaus...@reflektion.com> 于2021年3月18日周四 下午11:29写道:
>>
>>> Hi,
>>>
>>> I am doing a simple POC using Flink SQL and I am facing some issues with
>>> Interval Join.
>>>
>>> *Use Case*: I have two Kafka streams and using Flink SQL interval join
>>> I want to remove rows from* stream 1*(abandoned_user_visits) that are
>>> present in *stream 2*(orders) within some time interval.
>>>
>>> *Data:*
>>> 1) *Abandoned user visits.* Sample data:
>>> {"key1": "123", "email": "ema...@example.com", "abandoned_pids":
>>> [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
>>> {"key1": "234", "email": "ema...@example.com", "abandoned_pids":
>>> [1942367711], "ts": "2021-03-18 11:45:00.208"}
>>> {"key1": "123", "email": "ema...@example.com", "abandoned_pids":
>>> [1754171520], "ts": "2021-03-18 12:00:00.208"}
>>> {"key1": "234", "email": "ema...@example.com", "abandoned_pids":
>>> [1942367711], "ts": "2021-03-18 12:45:00.208"}
>>>
>>> 2) *User order stream*
>>> {"key1": "234", "email": "ema...@example.com", "pids": [1754171520],
>>> "ts": "2021-03-18 11:55:00.208"}
>>> {"key1": "123", "email": "ema...@example.com", "pids": [674378611,
>>> 1754171520], "ts": "2021-03-18 12:10:00.208"}
>>>
>>> When I try to push the above records to Kafka and select from the below
>>> VIEW. I get the result that is actually an *INNER* join(not OUTER
>>> join).
>>> I even tried posting just one record to stream(1) and no record to
>>> stream (2), expecting that that record should be emitted. But nothing was
>>> emitted. What was interesting is when I use the processing time instead of
>>> event time, I get the results as expected.
>>>
>>> *Tables and Views used: *
>>> CREATE TABLE abandoned_visits (
>>>         key1 STRING
>>>       , email STRING
>>>       , ts TIMESTAMP(3)
>>>       , abandoned_pids ARRAY<BIGINT>
>>>       , WATERMARK FOR ts AS ts
>>> )
>>> WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'abandoned-visits',
>>>   'properties.bootstrap.servers' = '...',
>>>   'format' = 'json'
>>> );
>>>
>>> CREATE TABLE orders (
>>>         key1 STRING
>>>       , email STRING
>>>       , ts TIMESTAMP(3)
>>>       , pids ARRAY<BIGINT>
>>>       , WATERMARK FOR ts AS ts
>>> )
>>> WITH (
>>>     'connector' = 'kafka',
>>>     'topic'     = 'orders',
>>>     'properties.bootstrap.servers' = '...',
>>>     'format'    = 'json'
>>> );
>>>
>>> CREATE VIEW abandoned_visits_with_no_orders AS
>>>   SELECT
>>>       av.key1
>>>     , av.email
>>>     , av.abandoned_pids
>>>     , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
>>>     , FLOOR(o.ts TO MINUTE)     AS order_timestamp
>>>     , o.email                   AS order_email
>>>   FROM abandoned_visits av
>>>   FULL OUTER JOIN orders o
>>>   ON  av.key1 = o.key1
>>>   AND av.email = o.email
>>>   AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL
>>> '30' MINUTE
>>> --  WHERE
>>> --    o.email IS NULL                                  // Commented this
>>> out so as to get something in result
>>> ;
>>>
>>> *Result: *
>>> select * from abandoned_visits_with_no_orders;
>>>
>>> This gives a result the same as an inner join. It doesn't have rows with
>>> NULL order data.
>>> I would appreciate any help.
>>>
>>> Thanks,
>>> Aneesha
>>>
>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li

Reply via email to