Hi,

Regarding step 3, it is sufficient to check that you got on message from
each parallel task of the previous operator. That's because a task
processes the timers of all keys before moving forward.
Timers are always processed per key, but you could deduplicate on the
parallel task id and check that you got a message from each task.

You can get the parallel task id from the
RuntimeContext.getIndexOfThisSubtask().
RuntimeContext.getNumberOfParallelSubtasks() gives the total number of
tasks.

Fabian

Am Fr., 2. Aug. 2019 um 10:55 Uhr schrieb Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com>:

> Hi Oytun, that sounds like a great idea thanks!! Just wanted to confirm a
> couple of things:
>
> -In step 2 by merging do you mean anything else apart from setting the
> operator parallelism to 1? Forcing a parallelism of 1 should ensure all
> items go to the same task.
>
> -In step 3 I don't think I could check an item for each key has been
> received, I would need to know how many keys I have on my stream (or could
> I!? that's exactly what I'm trying to solve) but I could definitely rely on
> Flink's watermarking mechanism. If the watermark > t (t being the time for
> the trigger of the first operator) it must mean all streams have finished.
>
> Thanks again
>
> On Thu, 1 Aug 2019, 18:34 Oytun Tez, <oy...@motaword.com> wrote:
>
>> Perhaps:
>>
>>    1. collect() an item inside onTimer() inside operator#1
>>    2. merge the resulting stream from all keys
>>    3. process the combined stream in operator#2 to see if all keys were
>>    processed. you will probably want to keep state in the operator#2 to see 
>> if
>>    you received items from all keys.
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Thu, Aug 1, 2019 at 1:06 PM Eduardo Winpenny Tejedor <
>> eduardo.winpe...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have a keyed operator with an hourly event time trigger. On a timer
>>> trigger, the operator simply persists some state to a table.
>>>
>>> I'd like to know when the triggers for all keys have finished so I can
>>> send a further signal to the data warehouse, to indicate it has all the
>>> necessary data to start producing a report.
>>>
>>> How can I achieve this? If my operator is distributed across different
>>> machine tasks I need to make sure I don't send the signal to the data
>>> warehouse before the timers for every key have fired.
>>>
>>> Thanks,
>>> Eduardo
>>>
>>

Reply via email to