Hi Vinod,
I cannot spot any problems in your SQL query.
Some questions for clarification:
1) Which planner are you using?
2) How do you create your watermarks?
3) Did you unit test with only parallelism of 1 or higher?
4) Can you share the output of TableEnvironment.explain() with us?
Shouldn't c have a rowtime constraint around o instead of r? Such that
all time-based operations work on o.rowtime?
Regards,
Timo
On 10.03.20 19:26, Vinod Mehra wrote:
Hi!
We are testing the following 3 way time windowed join to keep the
retained state size small. Using joins for the first time here. It works
in unit tests but we are not able to get expected results in production.
We are still troubleshooting this issue. Can you please help us review
this in case we missed something or our assumptions are wrong?
SELECT o.region_code,
concat_ws(
'/',
CAST(sum(CASE WHEN r.order_idIS NOT NULL AND c.order_idIS NULL THEN 1
ELSE 0 END)AS VARCHAR),
CAST(count(1)AS VARCHAR)
)AS offer_conversion_5m
FROM (
SELECT region_code,
offer_id,
rowtime
FROM event_offer_created
WHERE ...
) o
LEFT JOIN (
SELECT offer_id,
order_id,
rowtime
FROM event_order_requested
WHERE ...
) r
ON o.offer_id = r.offer_id
AND r.rowtimeBETWEEN o.rowtimeAND o.rowtime +INTERVAL '1' hour
LEFT JOIN (
SELECT order_id,
rowtime
FROM event_order_cancelled
WHERE ...
)c
ON r.order_id =c.order_id
AND c.rowtimeBETWEEN r.rowtimeAND r.rowtime +INTERVAL '1' hour
GROUP BY
o.region_code,
TUMBLE(o.rowtime,INTERVAL '5' minute)
The sequence of events is:
1. At time X an offer is created (event stream = "*event_offer_created"*)
2. At time Y that offer is used to create an order (event stream =
"*event_order_requested*"). Left join because not all offers get used.
3. At time Z that order is cancelled (event stream =
"*event_order_cancelled*"). Left join because not all orders get
cancelled.
"*offer_conversion_5m*" represents: number of converted orders / total
number of offerings" in a 5 minutes bucket. If an order gets cancelled
we don't want to count that. That's why we have [c.order_id IS NULL THEN
1 ELSE 0 END] in the select.
We picked 1 hour time windows because that's the maximum time we expect
the successive events to take for a given record chain.
The outer GROUP BY is to get 5 minute aggregation for each "region". As
expected the watermark lags 2 hour from the current time because of the
two time-window joins above. The IdleStateRetentionTime is not set, so
the expectation is that the state will be retained as per the time
window size and as the records fall off the window the state will be
cleaned up. The aggregated state is expected to be kept around for 5
minutes (GROUP BY).
However, we are unable to see the conversion (offer_created ->
order_requested (without order_cancelled)). '*offer_conversion_5m*' is
always zero although we know the streams contain records that should
have incremented the count. Any idea what could be wrong? Is the state
being dropped too early (5 mins) because of the outer 5 minute tumbling
window?
Thanks,
Vinod