Hi, There is a missing feature in Flink Table API/SQL of supporting retraction streams as the input (or conversions from append stream to retraction stream) at the moment. With that your problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related work [1], so this might be supported in the next couple of months.
There might a workaround at the moment that could work. I think you would need to write your own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most recent aggregated row. With that you could write a query like this: SELECT uid, count(*) FROM ( SELECT * FROM ( SELECT uid, LAST_ROW(status) FROM changelog GROUP BY uid, oid) WHERE status = `pending`) GROUP BY uid Where `changelog` is an append only stream with the following content: > user, order, status, event_time > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, pending, t3 > u1, o3, success, t4 > u2, o4, pending, t5 > u2, o4, pending, t6 Besides that, you could also write your own a relatively simple Data Stream application to do the same thing. I’m CC’ing Timo, maybe he will have another better idea. Piotrek [1] https://issues.apache.org/jira/browse/FLINK-8577 > On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalga...@gmail.com> wrote: > > Hi, > I have a requirement and need to understand if same can be achieved with > Flink retract stream. Let's say we have stream with 4 attributes userId, > orderId, status, event_time where orderId is unique and hence any change in > same orderId updates previous value as below > > Changelog Event Stream > > user, order, status, event_time > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, pending, t3 > u1, o3, success, t4 > u2, o4, pending, t5 > u2, o4, pending, t6 > > Snapshot view at time t6 (as viewed in mysql) > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, success, t4 > u4, o4, pending, t6 > (Here rows at time t3 and t5 are deleted as they have been updated for > respective order ids) > > What I need is to maintain count of "Pending" orders against a user and if > they go beyond configured threshold, then push that user and pending count to > Kafka. Here there can be multiple updates to order status e.g Pending -> > Success or Pending -> Failed. Also in some cases there may not be any change > in status but we may still get a row (may be due to some other attribute > update which we are not concerned about). So is it possible to have running > count in flink as below at respective event times. Here Pending count is > decreased from 2 to 1 for user u1 at t4 since one of it's order status was > changed from Pending to Success. Similarly for user u2, at time t6, there was > no change in running count as there was no change in status for order o4 > > t1 -> u1 : 1, u2 : 0 > t2 -> u1 : 1, u2 : 0 > t3 -> u1 : 2, u2 : 0 > t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is > decreased for u1) > t5 -> u1 : 1, u2 : 1 > t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change) > > As I understand may be retract stream can achieve this. However I am not sure > how. Any samples around this would be of great help. > > Gagan >