Hi Gagan,

Yes, you can achieve this with Flink TableAPI/SQL. However, you have to pay
attention to the following things:
1) Currently, Flink only ingests append streams. In order to ingest upsert
streams(steam with keys), you can use groupBy with a user-defined
LAST_VALUE aggregate function. For implementation, you can refer to the MAX
AggregateFunction(MAX always return the max value while LAST_VALUE always
return the latest value). The SQL may look like:

SELECT user, COUNT(*)
> FROM (
> SELECT order, LAST_VALUE(user), LAST_VALUE(status), LAST_VALUE(event_time)
> FROM SourceTable
> GROUP BY order
> )
> WHERE status = 'pending'
> GROUP BY user

You have to note that the query will be processed under processing time
instead of event time. But I think it would be fine for you, as the final
result will be right.

As for the upsert source, there is already a pr[1] on it, and it is under
review now.

2) You have to note that once you output results to Kafka according to a
configured threshold. The output record cannot be deleted anymore even the
count value decreased. Because Kafka doesn't support delete messages. Also,
this issue[2] make things worse. You can take a detailed look if you
interested in it.

Best, Hequn

[1] https://github.com/apache/flink/pull/6787
[2] https://issues.apache.org/jira/browse/FLINK-9528


On Sat, Jan 19, 2019 at 1:31 AM Gagan Agrawal <agrawalga...@gmail.com>
wrote:

> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>

Reply via email to