@Jeff: It depends if user can define a time window for his condition. As Gagan described his problem it was about “global” threshold of pending orders.
I have just thought about another solution that should work without any custom code. Converting “status” field to status_value int: - "+1” for pending - “-1” for success/failure - “0” otherwise Then running: SELECT uid, SUM(status_value) FROM … GROUP BY uid; Query on top of such stream. Conversion to integers could be made by using `CASE` expression. One thing to note here is that probably all of the proposed solutions would work based on the order of the records, not based on the event_time. Piotrek > On 21 Jan 2019, at 15:10, Jeff Zhang <zjf...@gmail.com> wrote: > > I am thinking of another approach instead of retract stream. Is it possible > to define a custom window to do this ? This window is defined for each order. > And then you just need to analyze the events in this window. > > Piotr Nowojski <pi...@da-platform.com <mailto:pi...@da-platform.com>> > 于2019年1月21日周一 下午8:44写道: > Hi, > > There is a missing feature in Flink Table API/SQL of supporting retraction > streams as the input (or conversions from append stream to retraction stream) > at the moment. With that your problem would simplify to one simple `SELECT > uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with > related work [1], so this might be supported in the next couple of months. > > There might a workaround at the moment that could work. I think you would > need to write your own custom `LAST_ROW(x)` aggregation function, which would > just return the value of the most recent aggregated row. With that you could > write a query like this: > > SELECT > uid, count(*) > FROM ( > SELECT > * > FROM ( > SELECT > uid, LAST_ROW(status) > FROM > changelog > GROUP BY > uid, oid) > WHERE status = `pending`) > GROUP BY > uid > > Where `changelog` is an append only stream with the following content: > >> user, order, status, event_time >> u1, o1, pending, t1 >> u2, o2, failed, t2 >> u1, o3, pending, t3 >> u1, o3, success, t4 >> u2, o4, pending, t5 >> u2, o4, pending, t6 > > > Besides that, you could also write your own a relatively simple Data Stream > application to do the same thing. > > I’m CC’ing Timo, maybe he will have another better idea. > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-8577 > <https://issues.apache.org/jira/browse/FLINK-8577> > >> On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalga...@gmail.com >> <mailto:agrawalga...@gmail.com>> wrote: >> >> Hi, >> I have a requirement and need to understand if same can be achieved with >> Flink retract stream. Let's say we have stream with 4 attributes userId, >> orderId, status, event_time where orderId is unique and hence any change in >> same orderId updates previous value as below >> >> Changelog Event Stream >> >> user, order, status, event_time >> u1, o1, pending, t1 >> u2, o2, failed, t2 >> u1, o3, pending, t3 >> u1, o3, success, t4 >> u2, o4, pending, t5 >> u2, o4, pending, t6 >> >> Snapshot view at time t6 (as viewed in mysql) >> u1, o1, pending, t1 >> u2, o2, failed, t2 >> u1, o3, success, t4 >> u4, o4, pending, t6 >> (Here rows at time t3 and t5 are deleted as they have been updated for >> respective order ids) >> >> What I need is to maintain count of "Pending" orders against a user and if >> they go beyond configured threshold, then push that user and pending count >> to Kafka. Here there can be multiple updates to order status e.g Pending -> >> Success or Pending -> Failed. Also in some cases there may not be any change >> in status but we may still get a row (may be due to some other attribute >> update which we are not concerned about). So is it possible to have running >> count in flink as below at respective event times. Here Pending count is >> decreased from 2 to 1 for user u1 at t4 since one of it's order status was >> changed from Pending to Success. Similarly for user u2, at time t6, there >> was no change in running count as there was no change in status for order o4 >> >> t1 -> u1 : 1, u2 : 0 >> t2 -> u1 : 1, u2 : 0 >> t3 -> u1 : 2, u2 : 0 >> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is >> decreased for u1) >> t5 -> u1 : 1, u2 : 1 >> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change) >> >> As I understand may be retract stream can achieve this. However I am not >> sure how. Any samples around this would be of great help. >> >> Gagan >> > > > > -- > Best Regards > > Jeff Zhang