This works with event-time as well. You need to set the right TimeCharacteristics on the exec env and assign timestamps + watermarks. The only time depended operation is the window. YourWindowFunction assigns the timestamp of the window. WindowFunction.apply() has a TimeWindow parameter that gives access to the window's start and end time. See docs [1].
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation 2016-09-30 11:00 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>: > I'm working with your suggestions, thank you very much. What I'm missing > here is what YourWindowFunction should do. I have no notion of event time > there and so I can't assign a timestamp. Also this solution seems to be > working by processing time, while I care about event time. I couldn't make > it run yet but for what I got, this is slightly different from what I need. > > 2016-09-30 10:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Simone, >> >> I think I have a solution for your problem: >> >> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time) >> >> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate) >> .keyBy(_._1) // key by id >> .flatMap(new StateUpdater) // StateUpdater is a stateful >> FlatMapFunction. It has a keyed state that stores the last state of each >> id. For each input record it returns two records: (oldState, -1), >> (newState, +1) >> >> stateChanges ensures that counts of previous states are subtracted. >> >> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges // >> (state, cntUpdate, time) >> .keyBy(_._1) // key by state >> .window() // your window, should be non-overlapping, so go for instance >> for Tumbling >> .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums >> the cntUpdates and YourWindowFunction assigns the timestamp of your window >> >> this step aggregates all state changes for each state in a window >> >> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count, >> time) >> .keyBy(_._1) // key by state again >> .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has >> a keyed state that stores the current count. For each incoming record, the >> count is adjusted and a record (state, newCount, time) is emitted. >> >> Now you have the new counts for your states in multiple records. If >> possible, you can update your Elasticsearch index using these. Otherwise, >> you have to collect them into one record using another window. >> >> Also note, that the state size of this program depends on the number of >> unique ids. That might cause problems if the id space grows very fast. >> >> Please let me know, if you have questions or if that works ;-) >> >> Cheers, Fabian >> >> >> 2016-09-30 0:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>: >> >>> Hello, >>> >>> in the last few days I tried to create my first real-time analytics job >>> in Flink. The approach is kappa-architecture-like, so I have my raw data on >>> Kafka where we receive a message for every change of state of any entity. >>> >>> So the messages are of the form >>> >>> (id,newStatus, timestamp) >>> >>> We want to compute, for every time window, the count of items in a given >>> status. So the output should be of the form >>> >>> (outputTimestamp, state1:count1,state2:count2 ...) >>> >>> or equivalent. These rows should contain, at any given time, the count >>> of the items in a given status, where the status associated to an Id is the >>> most recent message observed for that id. The status for an id should be >>> counted in any case, even if the event is way older than those getting >>> processed. So the sum of all the counts should be equal to the number of >>> different IDs observed in the system. The following step could be >>> forgetting about the items in a final item after a while, but this is not a >>> strict requirement right now. >>> >>> This will be written on elasticsearch and then queried. >>> >>> I tried many different paths and none of them completely satisfied the >>> requirement. Using a sliding window I could easily achieve the expected >>> behaviour, except that when the beginning of the sliding window surpassed >>> the timestamp of an event, it was lost for the count, as you may expect. >>> Others approaches failed to be consistent when working with a backlog >>> because I did some tricks with keys and timestamps that failed when the >>> data was processed all at once. >>> >>> So I would like to know, even at an high level, how should I approach >>> this problem. It looks like a relatively common use-case but the fact that >>> the relevant information for a given ID must be retained indefinitely to >>> count the entities correctly creates a lot of problems. >>> >>> Thank you in advance, >>> >>> Simone >>> >>> >> >