Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction 
streams as the input (or conversions from append stream to retraction stream) 
at the moment. With that your problem would simplify to one simple `SELECT uid, 
count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with related 
work [1], so this might be supported in the next couple of months.

There might a workaround at the moment that could work. I think you would need 
to write your own custom `LAST_ROW(x)` aggregation function, which would just 
return the value of the most recent aggregated row. With that you could write a 
query like this:

SELECT 
        uid, count(*) 
FROM (
        SELECT 
                * 
        FROM (
                SELECT 
                        uid, LAST_ROW(status)
                FROM
                        changelog
                GROUP BY
                        uid, oid)
        WHERE status = `pending`)
GROUP BY
        uid

Where `changelog` is an append only stream with the following content:

> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream 
application to do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

> On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalga...@gmail.com> wrote:
> 
> Hi,
> I have a requirement and need to understand if same can be achieved with 
> Flink retract stream. Let's say we have stream with 4 attributes userId, 
> orderId, status, event_time where orderId is unique and hence any change in 
> same orderId updates previous value as below
> 
> Changelog Event Stream
> 
> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6
> 
> Snapshot view at time t6 (as viewed in mysql)
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for 
> respective order ids)
> 
> What I need is to maintain count of "Pending" orders against a user and if 
> they go beyond configured threshold, then push that user and pending count to 
> Kafka. Here there can be multiple updates to order status e.g Pending -> 
> Success or Pending -> Failed. Also in some cases there may not be any change 
> in status but we may still get a row (may be due to some other attribute 
> update which we are not concerned about). So is it possible to have running 
> count in flink as below at respective event times. Here Pending count is 
> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
> changed from Pending to Success. Similarly for user u2, at time t6, there was 
> no change in running count as there was no change in status for order o4
> 
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
> decreased for u1)
> t5 -> u1 : 1, u2 : 1
> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
> 
> As I understand may be retract stream can achieve this. However I am not sure 
> how. Any samples around this would be of great help.
> 
> Gagan
> 

Reply via email to