I am thinking of another approach instead of retract stream. Is it possible to define a custom window to do this ? This window is defined for each order. And then you just need to analyze the events in this window.
Piotr Nowojski <pi...@da-platform.com> 于2019年1月21日周一 下午8:44写道: > Hi, > > There is a missing feature in Flink Table API/SQL of supporting retraction > streams as the input (or conversions from append stream to retraction > stream) at the moment. With that your problem would simplify to one simple > `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing > work with related work [1], so this might be supported in the next couple > of months. > > There might a workaround at the moment that could work. I think you would > need to write your own custom `LAST_ROW(x)` aggregation function, which > would just return the value of the most recent aggregated row. With that > you could write a query like this: > > SELECT > uid, count(*) > FROM ( > SELECT > * > FROM ( > SELECT > uid, LAST_ROW(status) > FROM > changelog > GROUP BY > uid, oid) > WHERE status = `pending`) > GROUP BY > uid > > Where `changelog` is an append only stream with the following content: > > *user, order, status, event_time* > u1, o1, pending, t1 > u2, o2, failed, t2 > *u1, o3, pending, t3* > *u1, o3, success, t4* > u2, o4, pending, t5 > u2, o4, pending, t6 > > > > Besides that, you could also write your own a relatively simple Data > Stream application to do the same thing. > > I’m CC’ing Timo, maybe he will have another better idea. > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-8577 > > On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalga...@gmail.com> wrote: > > Hi, > I have a requirement and need to understand if same can be achieved with > Flink retract stream. Let's say we have stream with 4 attributes userId, > orderId, status, event_time where orderId is unique and hence any change in > same orderId updates previous value as below > > *Changelog* *Event Stream* > > *user, order, status, event_time* > u1, o1, pending, t1 > u2, o2, failed, t2 > *u1, o3, pending, t3* > *u1, o3, success, t4* > u2, o4, pending, t5 > u2, o4, pending, t6 > > *Snapshot view at time t6 (as viewed in mysql)* > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, success, t4 > u4, o4, pending, t6 > (Here rows at time t3 and t5 are deleted as they have been updated for > respective order ids) > > What I need is to maintain count of "Pending" orders against a user and if > they go beyond configured threshold, then push that user and pending count > to Kafka. Here there can be multiple updates to order status e.g Pending -> > Success or Pending -> Failed. Also in some cases there may not be any > change in status but we may still get a row (may be due to some other > attribute update which we are not concerned about). So is it possible to > have running count in flink as below at respective event times. Here > Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's > order status was changed from Pending to Success. Similarly for user u2, at > time t6, there was no change in running count as there was no change in > status for order o4 > > t1 -> u1 : 1, u2 : 0 > t2 -> u1 : 1, u2 : 0 > t3 -> u1 : 2, u2 : 0 > *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is > decreased for u1)* > t5 -> u1 : 1, u2 : 1 > *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no > change)* > > As I understand may be retract stream can achieve this. However I am not > sure how. Any samples around this would be of great help. > > Gagan > > > -- Best Regards Jeff Zhang