I wanted to add that when I used the following the watermark was delayed by
3 hours instead of 2 hours that I would have expected:

AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime

(time window constraint between o and c: 1st and 3rd table)

Thanks,
Vinod

On Fri, Mar 13, 2020 at 3:56 PM Vinod Mehra <vme...@lyft.com> wrote:

> Thanks Timo for responding back! Answers below:
>
> > 1) Which planner are you using?
>
> We are using Flink 1.8 and using the default planner
> (org.apache.flink.table.calcite.FlinkPlannerImpl)
> from: org.apache.flink:flink-table-planner_2.11:1.8
>
> > 2) How do you create your watermarks?
>
> We are using periodic watermarking and have configured stream time
> characteristics as TimeCharacteristic.EventTime. The watermark assigner
> extracts the timestamp from time attributes from the event and keeps it 5
> seconds behind the maximum timestamp seen in order to allow for stale
> events.
>
> > 3) Did you unit test with only parallelism of 1 or higher?
>
> I tried both 1 and higher values in tests and for all parallelism values
> the unit tests works as expected.
>
> 4) Can you share the output of TableEnvironment.explain() with us?
>
> Attached. Please note that I had obfuscated the query a bit in my original
> post for clarity. I have pasted the actual query along with the plan so
> that you can correlate it.
>
> > Shouldn't c have a rowtime constraint around o instead of r? Such that
> all time-based operations work on o.rowtime?
>
> I have tried both (and some more variations). Got the same results (unit
> tests passes but production execution doesn't join as expected). Here is
> the modified query:
>
> SELECT o.region_code,
>        concat_ws(
>          '/',
>          CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL 
> THEN 1 ELSE 0 END) AS VARCHAR),
>          CAST(count(1) AS VARCHAR)
>        ) AS offer_conversion_5m
>   FROM (
>         SELECT region_code,
>                offer_id,
>                rowtime
>           FROM event_offer_created
>          WHERE ...
>         ) o
>    LEFT JOIN (
>         SELECT offer_id,
>                order_id,
>                rowtime
>           FROM event_order_requested
>          WHERE ...
>         ) r
>      ON o.offer_id = r.offer_id
>      AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime
>
>    LEFT JOIN (
>         SELECT order_id,
>                rowtime
>           FROM event_order_cancelled
>          WHERE ...
>         ) c
>      ON r.order_id = c.order_id
>      AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime
>
>  GROUP BY
>        o.region_code,
>        TUMBLE(o.rowtime, INTERVAL '5' minute)
>
>
> We used minus two hours ("c.rowtime - INTERVAL '2' hour")  in the 2nd
> time window because it is from the first table and 3rd one.
>
> -- Vinod
>
> On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Vinod,
>>
>> I cannot spot any problems in your SQL query.
>>
>> Some questions for clarification:
>> 1) Which planner are you using?
>> 2) How do you create your watermarks?
>> 3) Did you unit test with only parallelism of 1 or higher?
>> 4) Can you share the output of TableEnvironment.explain() with us?
>>
>> Shouldn't c have a rowtime constraint around o instead of r? Such that
>> all time-based operations work on o.rowtime?
>>
>> Regards,
>> Timo
>>
>>
>> On 10.03.20 19:26, Vinod Mehra wrote:
>> > Hi!
>> >
>> > We are testing the following 3 way time windowed join to keep the
>> > retained state size small. Using joins for the first time here. It
>> works
>> > in unit tests but we are not able to get expected results in
>> production.
>> > We are still troubleshooting this issue. Can you please help us review
>> > this in case we missed something or our assumptions are wrong?
>> >
>> > SELECT o.region_code,
>> >         concat_ws(
>> >           '/',
>> >           CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS
>> NULL THEN 1 ELSE 0 END)AS VARCHAR),
>> >           CAST(count(1)AS VARCHAR)
>> >         )AS offer_conversion_5m
>> >    FROM (
>> >          SELECT region_code,
>> >                 offer_id,
>> >                 rowtime
>> >            FROM event_offer_created
>> >           WHERE ...
>> > ) o
>> >     LEFT JOIN (
>> >          SELECT offer_id,
>> >                 order_id,
>> >                 rowtime
>> >            FROM event_order_requested
>> >           WHERE ...
>> > ) r
>> >       ON o.offer_id = r.offer_id
>> >       AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
>> > LEFT JOIN (
>> >          SELECT order_id,
>> >                 rowtime
>> >            FROM event_order_cancelled
>> >           WHERE ...
>> > )c
>> > ON r.order_id =c.order_id
>> >       AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
>> > GROUP BY
>> > o.region_code,
>> >         TUMBLE(o.rowtime,INTERVAL '5' minute)
>> >
>> >
>> > The sequence of events is:
>> >
>> >  1. At time X an offer is created (event stream =
>> "*event_offer_created"*)
>> >  2. At time Y that offer is used to create an order (event stream =
>> >     "*event_order_requested*"). Left join because not all offers get
>> used.
>> >  3. At time Z that order is cancelled (event stream =
>> >     "*event_order_cancelled*"). Left join because not all orders get
>> >     cancelled.
>> >
>> > "*offer_conversion_5m*" represents: number of converted orders / total
>> > number of offerings" in a 5 minutes bucket. If an order gets cancelled
>> > we don't want to count that. That's why we have [c.order_id IS NULL
>> THEN
>> > 1 ELSE 0 END] in the select.
>> >
>> > We picked 1 hour time windows because that's the maximum time we expect
>> > the successive events to take for a given record chain.
>> >
>> > The outer GROUP BY is to get 5 minute aggregation for each "region". As
>> > expected the watermark lags 2 hour from the current time because of the
>> > two time-window joins above. The IdleStateRetentionTime is not set, so
>> > the expectation is that the state will be retained as per the time
>> > window size and as the records fall off the window the state will be
>> > cleaned up. The aggregated state is expected to be kept around for 5
>> > minutes (GROUP BY).
>> >
>> > However, we are unable to see the conversion (offer_created ->
>> > order_requested (without order_cancelled)). '*offer_conversion_5m*' is
>> > always zero although we know the streams contain records that should
>> > have incremented the count. Any idea what could be wrong? Is the state
>> > being dropped too early (5 mins) because of the outer 5 minute tumbling
>> > window?
>> >
>> > Thanks,
>> > Vinod
>>
>>

Reply via email to