I am thinking of another approach instead of retract stream. Is it possible
to define a custom window to do this ? This window is defined for each
order. And then you just need to analyze the events in this window.

Piotr Nowojski <pi...@da-platform.com> 于2019年1月21日周一 下午8:44写道:

> Hi,
>
> There is a missing feature in Flink Table API/SQL of supporting retraction
> streams as the input (or conversions from append stream to retraction
> stream) at the moment. With that your problem would simplify to one simple
> `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing
> work with related work [1], so this might be supported in the next couple
> of months.
>
> There might a workaround at the moment that could work. I think you would
> need to write your own custom `LAST_ROW(x)` aggregation function, which
> would just return the value of the most recent aggregated row. With that
> you could write a query like this:
>
> SELECT
> uid, count(*)
> FROM (
> SELECT
> *
> FROM (
> SELECT
> uid, LAST_ROW(status)
> FROM
> changelog
> GROUP BY
> uid, oid)
> WHERE status = `pending`)
> GROUP BY
> uid
>
> Where `changelog` is an append only stream with the following content:
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
>
>
> Besides that, you could also write your own a relatively simple Data
> Stream application to do the same thing.
>
> I’m CC’ing Timo, maybe he will have another better idea.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-8577
>
> On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalga...@gmail.com> wrote:
>
> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>
>

-- 
Best Regards

Jeff Zhang

Reply via email to