Hello Benchao,

Thanks for clarifying!
The issue was I send very few records so I could not test how the watermark
is progressing. Today after trying on continuous stream I was able to get
the results.


On Fri, Mar 19, 2021 at 5:24 PM Benchao Li <libenc...@apache.org> wrote:

> Hi Aneesha,
>
> For the interval join operator will output the data with NULL when it
> confirms that
> there will no data coming before the watermark.
> And there is an optimization for reducing state access, which may add more
> time
> to trigger the output of these data.
> For your case, it's almost 30 + 30 = 60 min. Did you wait that long to
> check the output?
>
> Aneesha Kaushal <aneesha.kaus...@reflektion.com> 于2021年3月18日周四 下午11:29写道:
>
>> Hi,
>>
>> I am doing a simple POC using Flink SQL and I am facing some issues with
>> Interval Join.
>>
>> *Use Case*: I have two Kafka streams and using Flink SQL interval join I
>> want to remove rows from* stream 1*(abandoned_user_visits) that are
>> present in *stream 2*(orders) within some time interval.
>>
>> *Data:*
>> 1) *Abandoned user visits.* Sample data:
>> {"key1": "123", "email": "ema...@example.com", "abandoned_pids":
>> [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
>> {"key1": "234", "email": "ema...@example.com", "abandoned_pids":
>> [1942367711], "ts": "2021-03-18 11:45:00.208"}
>> {"key1": "123", "email": "ema...@example.com", "abandoned_pids":
>> [1754171520], "ts": "2021-03-18 12:00:00.208"}
>> {"key1": "234", "email": "ema...@example.com", "abandoned_pids":
>> [1942367711], "ts": "2021-03-18 12:45:00.208"}
>>
>> 2) *User order stream*
>> {"key1": "234", "email": "ema...@example.com", "pids": [1754171520],
>> "ts": "2021-03-18 11:55:00.208"}
>> {"key1": "123", "email": "ema...@example.com", "pids": [674378611,
>> 1754171520], "ts": "2021-03-18 12:10:00.208"}
>>
>> When I try to push the above records to Kafka and select from the below
>> VIEW. I get the result that is actually an *INNER* join(not OUTER join).
>> I even tried posting just one record to stream(1) and no record to stream
>> (2), expecting that that record should be emitted. But nothing was emitted.
>> What was interesting is when I use the processing time instead of event
>> time, I get the results as expected.
>>
>> *Tables and Views used: *
>> CREATE TABLE abandoned_visits (
>>         key1 STRING
>>       , email STRING
>>       , ts TIMESTAMP(3)
>>       , abandoned_pids ARRAY<BIGINT>
>>       , WATERMARK FOR ts AS ts
>> )
>> WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'abandoned-visits',
>>   'properties.bootstrap.servers' = '...',
>>   'format' = 'json'
>> );
>>
>> CREATE TABLE orders (
>>         key1 STRING
>>       , email STRING
>>       , ts TIMESTAMP(3)
>>       , pids ARRAY<BIGINT>
>>       , WATERMARK FOR ts AS ts
>> )
>> WITH (
>>     'connector' = 'kafka',
>>     'topic'     = 'orders',
>>     'properties.bootstrap.servers' = '...',
>>     'format'    = 'json'
>> );
>>
>> CREATE VIEW abandoned_visits_with_no_orders AS
>>   SELECT
>>       av.key1
>>     , av.email
>>     , av.abandoned_pids
>>     , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
>>     , FLOOR(o.ts TO MINUTE)     AS order_timestamp
>>     , o.email                   AS order_email
>>   FROM abandoned_visits av
>>   FULL OUTER JOIN orders o
>>   ON  av.key1 = o.key1
>>   AND av.email = o.email
>>   AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30'
>> MINUTE
>> --  WHERE
>> --    o.email IS NULL                                  // Commented this
>> out so as to get something in result
>> ;
>>
>> *Result: *
>> select * from abandoned_visits_with_no_orders;
>>
>> This gives a result the same as an inner join. It doesn't have rows with
>> NULL order data.
>> I would appreciate any help.
>>
>> Thanks,
>> Aneesha
>>
>>
>
> --
>
> Best,
> Benchao Li
>

Reply via email to