Hi Gagan, Yes, you can achieve this with Flink TableAPI/SQL. However, you have to pay attention to the following things: 1) Currently, Flink only ingests append streams. In order to ingest upsert streams(steam with keys), you can use groupBy with a user-defined LAST_VALUE aggregate function. For implementation, you can refer to the MAX AggregateFunction(MAX always return the max value while LAST_VALUE always return the latest value). The SQL may look like:
SELECT user, COUNT(*) > FROM ( > SELECT order, LAST_VALUE(user), LAST_VALUE(status), LAST_VALUE(event_time) > FROM SourceTable > GROUP BY order > ) > WHERE status = 'pending' > GROUP BY user You have to note that the query will be processed under processing time instead of event time. But I think it would be fine for you, as the final result will be right. As for the upsert source, there is already a pr[1] on it, and it is under review now. 2) You have to note that once you output results to Kafka according to a configured threshold. The output record cannot be deleted anymore even the count value decreased. Because Kafka doesn't support delete messages. Also, this issue[2] make things worse. You can take a detailed look if you interested in it. Best, Hequn [1] https://github.com/apache/flink/pull/6787 [2] https://issues.apache.org/jira/browse/FLINK-9528 On Sat, Jan 19, 2019 at 1:31 AM Gagan Agrawal <agrawalga...@gmail.com> wrote: > Hi, > I have a requirement and need to understand if same can be achieved with > Flink retract stream. Let's say we have stream with 4 attributes userId, > orderId, status, event_time where orderId is unique and hence any change in > same orderId updates previous value as below > > *Changelog* *Event Stream* > > *user, order, status, event_time* > u1, o1, pending, t1 > u2, o2, failed, t2 > *u1, o3, pending, t3* > *u1, o3, success, t4* > u2, o4, pending, t5 > u2, o4, pending, t6 > > *Snapshot view at time t6 (as viewed in mysql)* > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, success, t4 > u4, o4, pending, t6 > (Here rows at time t3 and t5 are deleted as they have been updated for > respective order ids) > > What I need is to maintain count of "Pending" orders against a user and if > they go beyond configured threshold, then push that user and pending count > to Kafka. Here there can be multiple updates to order status e.g Pending -> > Success or Pending -> Failed. Also in some cases there may not be any > change in status but we may still get a row (may be due to some other > attribute update which we are not concerned about). So is it possible to > have running count in flink as below at respective event times. Here > Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's > order status was changed from Pending to Success. Similarly for user u2, at > time t6, there was no change in running count as there was no change in > status for order o4 > > t1 -> u1 : 1, u2 : 0 > t2 -> u1 : 1, u2 : 0 > t3 -> u1 : 2, u2 : 0 > *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is > decreased for u1)* > t5 -> u1 : 1, u2 : 1 > *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no > change)* > > As I understand may be retract stream can achieve this. However I am not > sure how. Any samples around this would be of great help. > > Gagan > >