@Jeff: It depends if user can define a time window for his condition. As Gagan 
described his problem it was about “global” threshold of pending orders.



I have just thought about another solution that should work without any custom 
code. Converting “status” field to status_value int:
- "+1” for pending
- “-1” for success/failure
- “0” otherwise

Then running:

SELECT uid, SUM(status_value) FROM … GROUP BY uid;

Query on top of such stream. Conversion to integers could be made by using 
`CASE` expression. 

One thing to note here is that probably all of the proposed solutions would 
work based on the order of the records, not based on the event_time.

Piotrek

> On 21 Jan 2019, at 15:10, Jeff Zhang <zjf...@gmail.com> wrote:
> 
> I am thinking of another approach instead of retract stream. Is it possible 
> to define a custom window to do this ? This window is defined for each order. 
> And then you just need to analyze the events in this window.
> 
> Piotr Nowojski <pi...@da-platform.com <mailto:pi...@da-platform.com>> 
> 于2019年1月21日周一 下午8:44写道:
> Hi,
> 
> There is a missing feature in Flink Table API/SQL of supporting retraction 
> streams as the input (or conversions from append stream to retraction stream) 
> at the moment. With that your problem would simplify to one simple `SELECT 
> uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing work with 
> related work [1], so this might be supported in the next couple of months.
> 
> There might a workaround at the moment that could work. I think you would 
> need to write your own custom `LAST_ROW(x)` aggregation function, which would 
> just return the value of the most recent aggregated row. With that you could 
> write a query like this:
> 
> SELECT 
>       uid, count(*) 
> FROM (
>       SELECT 
>               * 
>       FROM (
>               SELECT 
>                       uid, LAST_ROW(status)
>               FROM
>                       changelog
>               GROUP BY
>                       uid, oid)
>       WHERE status = `pending`)
> GROUP BY
>       uid
> 
> Where `changelog` is an append only stream with the following content:
> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
> 
> 
> Besides that, you could also write your own a relatively simple Data Stream 
> application to do the same thing.
> 
> I’m CC’ing Timo, maybe he will have another better idea.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-8577 
> <https://issues.apache.org/jira/browse/FLINK-8577>
> 
>> On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalga...@gmail.com 
>> <mailto:agrawalga...@gmail.com>> wrote:
>> 
>> Hi,
>> I have a requirement and need to understand if same can be achieved with 
>> Flink retract stream. Let's say we have stream with 4 attributes userId, 
>> orderId, status, event_time where orderId is unique and hence any change in 
>> same orderId updates previous value as below
>> 
>> Changelog Event Stream
>> 
>> user, order, status, event_time
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, pending, t3
>> u1, o3, success, t4
>> u2, o4, pending, t5
>> u2, o4, pending, t6
>> 
>> Snapshot view at time t6 (as viewed in mysql)
>> u1, o1, pending, t1
>> u2, o2, failed, t2
>> u1, o3, success, t4
>> u4, o4, pending, t6
>> (Here rows at time t3 and t5 are deleted as they have been updated for 
>> respective order ids)
>> 
>> What I need is to maintain count of "Pending" orders against a user and if 
>> they go beyond configured threshold, then push that user and pending count 
>> to Kafka. Here there can be multiple updates to order status e.g Pending -> 
>> Success or Pending -> Failed. Also in some cases there may not be any change 
>> in status but we may still get a row (may be due to some other attribute 
>> update which we are not concerned about). So is it possible to have running 
>> count in flink as below at respective event times. Here Pending count is 
>> decreased from 2 to 1 for user u1 at t4 since one of it's order status was 
>> changed from Pending to Success. Similarly for user u2, at time t6, there 
>> was no change in running count as there was no change in status for order o4
>> 
>> t1 -> u1 : 1, u2 : 0
>> t2 -> u1 : 1, u2 : 0
>> t3 -> u1 : 2, u2 : 0
>> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is 
>> decreased for u1)
>> t5 -> u1 : 1, u2 : 1
>> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
>> 
>> As I understand may be retract stream can achieve this. However I am not 
>> sure how. Any samples around this would be of great help.
>> 
>> Gagan
>> 
> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang

Reply via email to