Aljoscha,

thanks for your response. The use-case I'm after is basically providing "early" (inaccurate) results to downstream consumers. Suppose we're running aggregations for daily time windows, but we don't want to wait a whole day to see results. The idea is to fire the windows continuously before they hit their end of life (at which point they fill be fired_and_purged and will provide the final, accurate answer.)

However, if all of these "early" fired panes emit elements with a timestamp equaling the end-of-the-window, stateful downstream operators a) have no chance distinguishing between the different panes of the same window b) and don't have any chance to set-up timers before the watermark at the downstream operator advances to the "end of the day".

Hope this clarifies my motivation a bit,
P.

On 11/14/2016 03:22 PM, Aljoscha Krettek wrote:
Hi,
I'm afraid the ContinuousEventTimeTrigger is a bit misleading and should
probably be removed. The problem is that a watermark T signals that we
won't see elements with a timestamp < T in the future. It does not
signal that we haven't already seen elements with a timestamp > T. So
this cannot be used to trigger at different stages of a given window.

Do you have a concrete use case in mind for which you wanted to use
ContinuousEventTimeTrigger?

Cheers,
Aljoscha

On Mon, 14 Nov 2016 at 09:58 Ufuk Celebi <u...@apache.org
<mailto:u...@apache.org>> wrote:

    Looping in Kostas and Aljoscha who should know what's the expected
    behaviour here ;)


    On 11 November 2016 at 16:17:23, Petr Novotnik
    (petr.novot...@firma.seznam.cz
    <mailto:petr.novot...@firma.seznam.cz>) wrote:
    > Hello,
    >
    > I'm struggling to understand the following behaviour of the
    > `WindowOperator` and would appreciate some insight from experts:
    >
    > In particular I'm thinking about the following hypothetical data flow:
    >
    > input.keyBy(..)
    > .window(TumblingEventTimeWindows.of(..))
    > .apply(..)
    > ...
    > .keyBy(..)
    > .window(CustomWindowAssignerUsingATriggerBasedOnTheElementsStamp)
    > .apply(..)
    >
    > When the first window operator fires a window based on the timer, the
    > emitted elements are assigned a timestamp which equals
    > `window.maxTimestamp()`. This stamp is then available in the second
    > window operator's trigger through the `onElement` method. So far
    so good.
    >
    > However, when using `ContinuousEventTimeTrigger` (simply put when
    firing
    > the window multiple times at different times in its lifecycle) in the
    > first window operator, _all_ of the elements of this window - no
    matter
    > whether fired as a partial or the final window result - will
    arrive with
    > the same stamp in the (downstream) operators.
    >
    > This make it practically impossible to use again
    > `ContinuousEventTimeTrigger` (or similar) in the second window
    operator
    > to achieve "early firing" again.
    >
    > This is surprising. I would expect the elements to be assigned the
    stamp
    > of the timer which fired them (which will be window#maxTimestamp() for
    > `TumblingEventTimeWindows`). Is there any particular reason for the
    > unconditional assignment to `window.maxTimestamp()`?
    >
    > Many thanks in advance,
    > P.
    >

Reply via email to