Hi, Regarding step 3, it is sufficient to check that you got on message from each parallel task of the previous operator. That's because a task processes the timers of all keys before moving forward. Timers are always processed per key, but you could deduplicate on the parallel task id and check that you got a message from each task.
You can get the parallel task id from the RuntimeContext.getIndexOfThisSubtask(). RuntimeContext.getNumberOfParallelSubtasks() gives the total number of tasks. Fabian Am Fr., 2. Aug. 2019 um 10:55 Uhr schrieb Eduardo Winpenny Tejedor < eduardo.winpe...@gmail.com>: > Hi Oytun, that sounds like a great idea thanks!! Just wanted to confirm a > couple of things: > > -In step 2 by merging do you mean anything else apart from setting the > operator parallelism to 1? Forcing a parallelism of 1 should ensure all > items go to the same task. > > -In step 3 I don't think I could check an item for each key has been > received, I would need to know how many keys I have on my stream (or could > I!? that's exactly what I'm trying to solve) but I could definitely rely on > Flink's watermarking mechanism. If the watermark > t (t being the time for > the trigger of the first operator) it must mean all streams have finished. > > Thanks again > > On Thu, 1 Aug 2019, 18:34 Oytun Tez, <oy...@motaword.com> wrote: > >> Perhaps: >> >> 1. collect() an item inside onTimer() inside operator#1 >> 2. merge the resulting stream from all keys >> 3. process the combined stream in operator#2 to see if all keys were >> processed. you will probably want to keep state in the operator#2 to see >> if >> you received items from all keys. >> >> >> --- >> Oytun Tez >> >> *M O T A W O R D* >> The World's Fastest Human Translation Platform. >> oy...@motaword.com — www.motaword.com >> >> >> On Thu, Aug 1, 2019 at 1:06 PM Eduardo Winpenny Tejedor < >> eduardo.winpe...@gmail.com> wrote: >> >>> Hi all, >>> >>> I have a keyed operator with an hourly event time trigger. On a timer >>> trigger, the operator simply persists some state to a table. >>> >>> I'd like to know when the triggers for all keys have finished so I can >>> send a further signal to the data warehouse, to indicate it has all the >>> necessary data to start producing a report. >>> >>> How can I achieve this? If my operator is distributed across different >>> machine tasks I need to make sure I don't send the signal to the data >>> warehouse before the timers for every key have fired. >>> >>> Thanks, >>> Eduardo >>> >>