On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <[email protected]> wrote:

> Thanks again for the answers so far!  I really appreciate it.  As for my
> specific use-case, we're using Bigtable as the final sink, and I'd prefer
> to keep our writes fully idempotent for other reasons (ie no
> read-modify-write).  We actually do track tentative vs final values
> already, but checking that at write-time would impose a pretty big overhead
> in the write path.
>
> After this I actually instrumented one of my running pipelines to detect
> these "time traveling" panes, and did see it occurring pretty frequently,
> particularly when dataflow decides to scale up/down the job, so that was
> interesting.
>
> From all this, it seems like using a stateful DoFn to prevent time
> traveling panes from overwriting newer ones is the best solution for now.
>

Note that you can't "filter out" these time traveling panes, because at the
next fusion break they might get re-ordered again.


> My last question / statement is just around general education and
> documentation about this.  I think the fact that PCollection are unordered
> makes sense and is pretty intuitive, but fired panes being delivered
> out-of-order seems very surprising.  I'm curious how many other pipelines
> exist that run into this (and produce incorrect results!) but people are
> unaware of.  Is there a way we can call this behavior out?  For example,
> many of the sample beam projects use early firings, but there's never any
> mention that the output may be out-of-order.
>

+1 to improving the documentation here. Basically multiple firings become
independent elements of the resulting PCollection, they don't retain any
association/ordering.

Multiply-triggered window are difficult to reason about (and not just in
this case), https://s.apache.org/beam-sink-triggers is IMHO the right
answer.


> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <[email protected]>
> wrote:
>
>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <[email protected]>
>> wrote:
>> >
>> > wow, thats super unexpected and dangerous, thanks for clarifying!  Time
>> to go re-think how we do some of our writes w/ early firings then.
>> >
>> > Are there any workarounds to make things happen in-order in dataflow?
>>  eg if the sink gets fused to the output of the GBK operation, will it
>> always happen effectively in order (per key) even though it's not a
>> guarantee?
>>
>> If things get fused, yes. Note that sinks themselves sometimes have
>> fusion barriers though.
>>
>> > I also guess I could keep track of the last pane index my sink has
>> seen, and ignore earlier ones (using state to keep track)?
>>
>> Yes, that would work.
>>
>> What kind of sink are you using? If it supports read-modify-write or some
>> kind of transaction you may be able to mark early results as tentative
>> (which would be useful anyway) and only overwrite tentative ones.
>>
>>
>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <[email protected]>
>> wrote:
>> >>
>> >> Correct, even within the same key there's no promise of event time
>> ordering mapping of panes to real time ordering because the downstream
>> operations may happen on a different machine. Multiply triggered windows
>> add an element of non-determinism to the process.
>> >>
>> >> You're also correct that triggering with multiple panes requires lots
>> of care, especially when it comes to operations with side effects (like
>> sinks). Most safe is to only write the final pane to the sink, and handle
>> early triggering in a different way.
>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>> easier to reason about.
>> >>
>> >>
>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]>
>> wrote:
>> >>>
>> >>> Also to clarify here (I re-read this and realized it could be
>> slightly unclear).  My question is only about in-order delivery of panes.
>>  ie: will pane P always be delivered before P+1.
>> >>>
>> >>> I realize the use of "in-order" before could be confusing, I don't
>> care about the ordering of the elements per-se, just the ordering of the
>> pane delivery.
>> >>>
>> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
>> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>> >>>
>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <[email protected]>
>> wrote:
>> >>>>
>> >>>> Are you also saying also that even in the first example (Source ->
>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>> pretty big "got-cha" for correctness if you ever use accumulating
>> triggering.
>> >>>>
>> >>>> I'd also like to point out I'm not talking about a global ordering
>> across the entire PCollection, I'm talking about within the same key after
>> a GBK transform.
>> >>>>
>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>> [email protected]> wrote:
>> >>>>>
>> >>>>> Due to the nature of distributed processing, order is not
>> preserved. You can, however, inspect the PaneInfo to determine if an
>> element was early, on-time, or late and act accordingly.
>> >>>>>
>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>> [email protected]> wrote:
>> >>>>>>
>> >>>>>> In my experience ordering is not guaranteed, you may need apply a
>> transformation that sort the elements and then dispatch them sorted out.
>> >>>>>>
>> >>>>>> Or uses the Sorter extension for this:
>> >>>>>>
>> >>>>>>
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>> >>>>>>
>> >>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb.
>> 2019, 16:31:
>> >>>>>>>
>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>> windowing, triggering, and panes work together, and how to ensure
>> correctness throughout a pipeline.
>> >>>>>>>
>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
>> like:
>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>> >>>>>>>
>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>> accumulating panes, this is pretty straight forward.  However, this can get
>> more complicated if we add steps after the CombineByKey, for instance
>> (using the same windowing strategy):
>> >>>>>>>
>> >>>>>>> Say I want to buffer the results of the CombineByKey into batches
>> of N elements.  I can do this with the built-in GroupIntoBatches [1]
>> transform, now my pipeline looks like:
>> >>>>>>>
>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>> >>>>>>>
>> >>>>>>> This leads to my main question:
>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>> result from early firing F+1 now comes BEFORE the firing F (because it was
>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>> gets F+1 before F, which means my resulting store has incorrect data
>> (possibly forever if F+1 was the final firing).
>> >>>>>>>
>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>> example here, but I imagine this could happen with any GBK type operation.
>> >>>>>>>
>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>> >>>>>>>
>> >>>>>>> [1]
>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>
>>>

Reply via email to