This works with event-time as well. You need to set the right
TimeCharacteristics on the exec env and assign timestamps + watermarks. The
only time depended operation is the window. YourWindowFunction assigns the
timestamp of the window. WindowFunction.apply() has a TimeWindow parameter
that gives access to the window's start and end time. See docs [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation

2016-09-30 11:00 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:

> I'm working with your suggestions, thank you very much. What I'm missing
> here is what YourWindowFunction should do. I have no notion of event time
> there and so I can't assign a timestamp. Also this solution seems to be
> working by processing time, while I care about event time. I couldn't make
> it run yet but for what I got, this is slightly different from what I need.
>
> 2016-09-30 10:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Simone,
>>
>> I think I have a solution for your problem:
>>
>> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>>
>> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>>   .keyBy(_._1) // key by id
>>   .flatMap(new StateUpdater) // StateUpdater is a stateful
>> FlatMapFunction. It has a keyed state that stores the last state of each
>> id. For each input record it returns two records: (oldState, -1),
>> (newState, +1)
>>
>> stateChanges ensures that counts of previous states are subtracted.
>>
>> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
>> (state, cntUpdate, time)
>>   .keyBy(_._1) // key by state
>>   .window() // your window, should be non-overlapping, so go for instance
>> for Tumbling
>>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
>> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>>
>> this step aggregates all state changes for each state in a window
>>
>> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
>> time)
>>   .keyBy(_._1) // key by state again
>>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
>> a keyed state that stores the current count. For each incoming record, the
>> count is adjusted and a record (state, newCount, time) is emitted.
>>
>> Now you have the new counts for your states in multiple records. If
>> possible, you can update your Elasticsearch index using these. Otherwise,
>> you have to collect them into one record using another window.
>>
>> Also note, that the state size of this program depends on the number of
>> unique ids. That might cause problems if the id space grows very fast.
>>
>> Please let me know, if you have questions or if that works ;-)
>>
>> Cheers, Fabian
>>
>>
>> 2016-09-30 0:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>>
>>> Hello,
>>>
>>> in the last few days I tried to create my first real-time analytics job
>>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>>> Kafka where we receive a message for every change of state of any entity.
>>>
>>> So the messages are of the form
>>>
>>> (id,newStatus, timestamp)
>>>
>>> We want to compute, for every time window, the count of items in a given
>>> status. So the output should be of the form
>>>
>>> (outputTimestamp, state1:count1,state2:count2 ...)
>>>
>>> or equivalent. These rows should contain, at any given time, the count
>>> of the items in a given status, where the status associated to an Id is the
>>> most recent message observed for that id. The status for an id should be
>>> counted in any case, even if the event is way older than those getting
>>> processed. So the sum of all the counts should be equal to the number of
>>> different IDs observed in the system. The following step could be
>>> forgetting about the items in a final item after a while, but this is not a
>>> strict requirement right now.
>>>
>>> This will be written on elasticsearch and then queried.
>>>
>>> I tried many different paths and none of them completely satisfied the
>>> requirement. Using a sliding window I could easily achieve the expected
>>> behaviour, except that when the beginning of the sliding window surpassed
>>> the timestamp of an event, it was lost for the count, as you may expect.
>>> Others approaches failed to be consistent when working with a backlog
>>> because I did some tricks with keys and timestamps that failed when the
>>> data was processed all at once.
>>>
>>> So I would like to know, even at an high level, how should I approach
>>> this problem. It looks like a relatively common use-case but the fact that
>>> the relevant information for a given ID must be retained indefinitely to
>>> count the entities correctly creates a lot of problems.
>>>
>>> Thank you in advance,
>>>
>>> Simone
>>>
>>>
>>
>

Reply via email to