I wanted to add that when I used the following the watermark was delayed by 3 hours instead of 2 hours that I would have expected:
AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime (time window constraint between o and c: 1st and 3rd table) Thanks, Vinod On Fri, Mar 13, 2020 at 3:56 PM Vinod Mehra <vme...@lyft.com> wrote: > Thanks Timo for responding back! Answers below: > > > 1) Which planner are you using? > > We are using Flink 1.8 and using the default planner > (org.apache.flink.table.calcite.FlinkPlannerImpl) > from: org.apache.flink:flink-table-planner_2.11:1.8 > > > 2) How do you create your watermarks? > > We are using periodic watermarking and have configured stream time > characteristics as TimeCharacteristic.EventTime. The watermark assigner > extracts the timestamp from time attributes from the event and keeps it 5 > seconds behind the maximum timestamp seen in order to allow for stale > events. > > > 3) Did you unit test with only parallelism of 1 or higher? > > I tried both 1 and higher values in tests and for all parallelism values > the unit tests works as expected. > > 4) Can you share the output of TableEnvironment.explain() with us? > > Attached. Please note that I had obfuscated the query a bit in my original > post for clarity. I have pasted the actual query along with the plan so > that you can correlate it. > > > Shouldn't c have a rowtime constraint around o instead of r? Such that > all time-based operations work on o.rowtime? > > I have tried both (and some more variations). Got the same results (unit > tests passes but production execution doesn't join as expected). Here is > the modified query: > > SELECT o.region_code, > concat_ws( > '/', > CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS NULL > THEN 1 ELSE 0 END) AS VARCHAR), > CAST(count(1) AS VARCHAR) > ) AS offer_conversion_5m > FROM ( > SELECT region_code, > offer_id, > rowtime > FROM event_offer_created > WHERE ... > ) o > LEFT JOIN ( > SELECT offer_id, > order_id, > rowtime > FROM event_order_requested > WHERE ... > ) r > ON o.offer_id = r.offer_id > AND o.rowtime BETWEEN r.rowtime - INTERVAL '1' hour AND r.rowtime > > LEFT JOIN ( > SELECT order_id, > rowtime > FROM event_order_cancelled > WHERE ... > ) c > ON r.order_id = c.order_id > AND o.rowtime BETWEEN c.rowtime - INTERVAL '2' hour AND c.rowtime > > GROUP BY > o.region_code, > TUMBLE(o.rowtime, INTERVAL '5' minute) > > > We used minus two hours ("c.rowtime - INTERVAL '2' hour") in the 2nd > time window because it is from the first table and 3rd one. > > -- Vinod > > On Fri, Mar 13, 2020 at 6:42 AM Timo Walther <twal...@apache.org> wrote: > >> Hi Vinod, >> >> I cannot spot any problems in your SQL query. >> >> Some questions for clarification: >> 1) Which planner are you using? >> 2) How do you create your watermarks? >> 3) Did you unit test with only parallelism of 1 or higher? >> 4) Can you share the output of TableEnvironment.explain() with us? >> >> Shouldn't c have a rowtime constraint around o instead of r? Such that >> all time-based operations work on o.rowtime? >> >> Regards, >> Timo >> >> >> On 10.03.20 19:26, Vinod Mehra wrote: >> > Hi! >> > >> > We are testing the following 3 way time windowed join to keep the >> > retained state size small. Using joins for the first time here. It >> works >> > in unit tests but we are not able to get expected results in >> production. >> > We are still troubleshooting this issue. Can you please help us review >> > this in case we missed something or our assumptions are wrong? >> > >> > SELECT o.region_code, >> > concat_ws( >> > '/', >> > CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS >> NULL THEN 1 ELSE 0 END)AS VARCHAR), >> > CAST(count(1)AS VARCHAR) >> > )AS offer_conversion_5m >> > FROM ( >> > SELECT region_code, >> > offer_id, >> > rowtime >> > FROM event_offer_created >> > WHERE ... >> > ) o >> > LEFT JOIN ( >> > SELECT offer_id, >> > order_id, >> > rowtime >> > FROM event_order_requested >> > WHERE ... >> > ) r >> > ON o.offer_id = r.offer_id >> > AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour >> > LEFT JOIN ( >> > SELECT order_id, >> > rowtime >> > FROM event_order_cancelled >> > WHERE ... >> > )c >> > ON r.order_id =c.order_id >> > AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour >> > GROUP BY >> > o.region_code, >> > TUMBLE(o.rowtime,INTERVAL '5' minute) >> > >> > >> > The sequence of events is: >> > >> > 1. At time X an offer is created (event stream = >> "*event_offer_created"*) >> > 2. At time Y that offer is used to create an order (event stream = >> > "*event_order_requested*"). Left join because not all offers get >> used. >> > 3. At time Z that order is cancelled (event stream = >> > "*event_order_cancelled*"). Left join because not all orders get >> > cancelled. >> > >> > "*offer_conversion_5m*" represents: number of converted orders / total >> > number of offerings" in a 5 minutes bucket. If an order gets cancelled >> > we don't want to count that. That's why we have [c.order_id IS NULL >> THEN >> > 1 ELSE 0 END] in the select. >> > >> > We picked 1 hour time windows because that's the maximum time we expect >> > the successive events to take for a given record chain. >> > >> > The outer GROUP BY is to get 5 minute aggregation for each "region". As >> > expected the watermark lags 2 hour from the current time because of the >> > two time-window joins above. The IdleStateRetentionTime is not set, so >> > the expectation is that the state will be retained as per the time >> > window size and as the records fall off the window the state will be >> > cleaned up. The aggregated state is expected to be kept around for 5 >> > minutes (GROUP BY). >> > >> > However, we are unable to see the conversion (offer_created -> >> > order_requested (without order_cancelled)). '*offer_conversion_5m*' is >> > always zero although we know the streams contain records that should >> > have incremented the count. Any idea what could be wrong? Is the state >> > being dropped too early (5 mins) because of the outer 5 minute tumbling >> > window? >> > >> > Thanks, >> > Vinod >> >>