Thanks again for all the replies everyone.  Just as a final follow up here,
are there any concrete plans on addressing these issues I could start
following?  The sink trigger doc seems like a start, but also seems like
just a starting point in a larger re-architecture of sinks.

On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles <[email protected]> wrote:

>
>
> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <[email protected]>
>> wrote:
>>
>>>
>>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks again for the answers so far!  I really appreciate it.  As for
>>>>> my specific use-case, we're using Bigtable as the final sink, and I'd
>>>>> prefer to keep our writes fully idempotent for other reasons (ie no
>>>>> read-modify-write).  We actually do track tentative vs final values
>>>>> already, but checking that at write-time would impose a pretty big 
>>>>> overhead
>>>>> in the write path.
>>>>>
>>>>> After this I actually instrumented one of my running pipelines to
>>>>> detect these "time traveling" panes, and did see it occurring pretty
>>>>> frequently, particularly when dataflow decides to scale up/down the job, 
>>>>> so
>>>>> that was interesting.
>>>>>
>>>>> From all this, it seems like using a stateful DoFn to prevent time
>>>>> traveling panes from overwriting newer ones is the best solution for now.
>>>>>
>>>>
>>>> Note that you can't "filter out" these time traveling panes, because at
>>>> the next fusion break they might get re-ordered again.
>>>>
>>>
>>> Ack, in a general sense.  To solve my specific problem my plan was to
>>> ensure the final writer sink would be fused to this filter step (or even
>>> build it directly into the DoFn itself that does the write), which would
>>> work in my specific case (it seems like at least).
>>>
>>>
>>>>
>>>>
>>>>> My last question / statement is just around general education and
>>>>> documentation about this.  I think the fact that PCollection are unordered
>>>>> makes sense and is pretty intuitive, but fired panes being delivered
>>>>> out-of-order seems very surprising.  I'm curious how many other pipelines
>>>>> exist that run into this (and produce incorrect results!) but people are
>>>>> unaware of.  Is there a way we can call this behavior out?  For example,
>>>>> many of the sample beam projects use early firings, but there's never any
>>>>> mention that the output may be out-of-order.
>>>>>
>>>>
>>>> +1 to improving the documentation here. Basically multiple firings
>>>> become independent elements of the resulting PCollection, they don't retain
>>>> any association/ordering.
>>>>
>>>> Multiply-triggered window are difficult to reason about (and not just
>>>> in this case), https://s.apache.org/beam-sink-triggers is IMHO the
>>>> right answer.
>>>>
>>>
>>> I was reading this yesterday, but couldn't see how it solved the
>>> out-of-order delivery problem here.  I do like the overall direction its
>>> proposing though, from my work with triggers so far I have found them very
>>> difficult to reason about (like you said).
>>>
>>
>> It moves the responsibility of doing things in the right order (and even
>> defining what order is "correct enough") to the runner (and sinks) such
>> that the side effects happen in order, even if all the processing did not.
>> To be clear there's still a fair amount of design to make that doc into a
>> workable system...
>>
>
> With or without sink triggers, transforms that write need to be
> pane-index-aware. The outputs themselves may be out of order, but they have
> sequence numbers on them, so sinks likely need to be made stateful so they
> can be idempotent in the face of reordering.
>
> Kenn
>
>
>>
>>
>>>
>>>
>>>>
>>>>
>>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <[email protected]>
>>>>>> wrote:
>>>>>> >
>>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying!
>>>>>> Time to go re-think how we do some of our writes w/ early firings then.
>>>>>> >
>>>>>> > Are there any workarounds to make things happen in-order in
>>>>>> dataflow?  eg if the sink gets fused to the output of the GBK operation,
>>>>>> will it always happen effectively in order (per key) even though it's 
>>>>>> not a
>>>>>> guarantee?
>>>>>>
>>>>>> If things get fused, yes. Note that sinks themselves sometimes have
>>>>>> fusion barriers though.
>>>>>>
>>>>>> > I also guess I could keep track of the last pane index my sink has
>>>>>> seen, and ignore earlier ones (using state to keep track)?
>>>>>>
>>>>>> Yes, that would work.
>>>>>>
>>>>>> What kind of sink are you using? If it supports read-modify-write or
>>>>>> some kind of transaction you may be able to mark early results as 
>>>>>> tentative
>>>>>> (which would be useful anyway) and only overwrite tentative ones.
>>>>>>
>>>>>>
>>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <
>>>>>> [email protected]> wrote:
>>>>>> >>
>>>>>> >> Correct, even within the same key there's no promise of event time
>>>>>> ordering mapping of panes to real time ordering because the downstream
>>>>>> operations may happen on a different machine. Multiply triggered windows
>>>>>> add an element of non-determinism to the process.
>>>>>> >>
>>>>>> >> You're also correct that triggering with multiple panes requires
>>>>>> lots of care, especially when it comes to operations with side effects
>>>>>> (like sinks). Most safe is to only write the final pane to the sink, and
>>>>>> handle early triggering in a different way.
>>>>>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>>>>>> easier to reason about.
>>>>>> >>
>>>>>> >>
>>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Also to clarify here (I re-read this and realized it could be
>>>>>> slightly unclear).  My question is only about in-order delivery of panes.
>>>>>>  ie: will pane P always be delivered before P+1.
>>>>>> >>>
>>>>>> >>> I realize the use of "in-order" before could be confusing, I
>>>>>> don't care about the ordering of the elements per-se, just the ordering 
>>>>>> of
>>>>>> the pane delivery.
>>>>>> >>>
>>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0,
>>>>>> P1, P2) for a key, a downstream PCollection could never see P0, P2, P1.  
>>>>>> OR
>>>>>> at least, the final firing is always guaranteed to be delivered after all
>>>>>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>>>> >>>
>>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <
>>>>>> [email protected]> wrote:
>>>>>> >>>>
>>>>>> >>>> Are you also saying also that even in the first example (Source
>>>>>> -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>>>>> delivered in-order from the Combine -> Sink transforms?  This seems like 
>>>>>> a
>>>>>> pretty big "got-cha" for correctness if you ever use accumulating
>>>>>> triggering.
>>>>>> >>>>
>>>>>> >>>> I'd also like to point out I'm not talking about a global
>>>>>> ordering across the entire PCollection, I'm talking about within the same
>>>>>> key after a GBK transform.
>>>>>> >>>>
>>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>>>>>> [email protected]> wrote:
>>>>>> >>>>>
>>>>>> >>>>> Due to the nature of distributed processing, order is not
>>>>>> preserved. You can, however, inspect the PaneInfo to determine if an
>>>>>> element was early, on-time, or late and act accordingly.
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>>>>>> [email protected]> wrote:
>>>>>> >>>>>>
>>>>>> >>>>>> In my experience ordering is not guaranteed, you may need
>>>>>> apply a transformation that sort the elements and then dispatch them 
>>>>>> sorted
>>>>>> out.
>>>>>> >>>>>>
>>>>>> >>>>>> Or uses the Sorter extension for this:
>>>>>> >>>>>>
>>>>>> >>>>>>
>>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>> >>>>>>
>>>>>> >>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb.
>>>>>> 2019, 16:31:
>>>>>> >>>>>>>
>>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>>>>>> windowing, triggering, and panes work together, and how to ensure
>>>>>> correctness throughout a pipeline.
>>>>>> >>>>>>>
>>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that
>>>>>> looks like:
>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>> >>>>>>>
>>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute,
>>>>>> and accumulating panes, this is pretty straight forward.  However, this 
>>>>>> can
>>>>>> get more complicated if we add steps after the CombineByKey, for instance
>>>>>> (using the same windowing strategy):
>>>>>> >>>>>>>
>>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into
>>>>>> batches of N elements.  I can do this with the built-in GroupIntoBatches
>>>>>> [1] transform, now my pipeline looks like:
>>>>>> >>>>>>>
>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>> >>>>>>>
>>>>>> >>>>>>> This leads to my main question:
>>>>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that
>>>>>> the result from early firing F+1 now comes BEFORE the firing F (because 
>>>>>> it
>>>>>> was re-ordered in the GroupIntoBatches).  This would mean that the sink
>>>>>> then gets F+1 before F, which means my resulting store has incorrect data
>>>>>> (possibly forever if F+1 was the final firing).
>>>>>> >>>>>>>
>>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>>>>>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>>>>>> example here, but I imagine this could happen with any GBK type 
>>>>>> operation.
>>>>>> >>>>>>>
>>>>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>> >>>>>>>
>>>>>> >>>>>>> [1]
>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>>
>>>>>>>

Reply via email to