Thanks again for all the replies everyone. Just as a final follow up here, are there any concrete plans on addressing these issues I could start following? The sink trigger doc seems like a start, but also seems like just a starting point in a larger re-architecture of sinks.
On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles <[email protected]> wrote: > > > On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <[email protected]> > wrote: > >> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <[email protected]> >> wrote: >> >>> >>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <[email protected]> >>>> wrote: >>>> >>>>> Thanks again for the answers so far! I really appreciate it. As for >>>>> my specific use-case, we're using Bigtable as the final sink, and I'd >>>>> prefer to keep our writes fully idempotent for other reasons (ie no >>>>> read-modify-write). We actually do track tentative vs final values >>>>> already, but checking that at write-time would impose a pretty big >>>>> overhead >>>>> in the write path. >>>>> >>>>> After this I actually instrumented one of my running pipelines to >>>>> detect these "time traveling" panes, and did see it occurring pretty >>>>> frequently, particularly when dataflow decides to scale up/down the job, >>>>> so >>>>> that was interesting. >>>>> >>>>> From all this, it seems like using a stateful DoFn to prevent time >>>>> traveling panes from overwriting newer ones is the best solution for now. >>>>> >>>> >>>> Note that you can't "filter out" these time traveling panes, because at >>>> the next fusion break they might get re-ordered again. >>>> >>> >>> Ack, in a general sense. To solve my specific problem my plan was to >>> ensure the final writer sink would be fused to this filter step (or even >>> build it directly into the DoFn itself that does the write), which would >>> work in my specific case (it seems like at least). >>> >>> >>>> >>>> >>>>> My last question / statement is just around general education and >>>>> documentation about this. I think the fact that PCollection are unordered >>>>> makes sense and is pretty intuitive, but fired panes being delivered >>>>> out-of-order seems very surprising. I'm curious how many other pipelines >>>>> exist that run into this (and produce incorrect results!) but people are >>>>> unaware of. Is there a way we can call this behavior out? For example, >>>>> many of the sample beam projects use early firings, but there's never any >>>>> mention that the output may be out-of-order. >>>>> >>>> >>>> +1 to improving the documentation here. Basically multiple firings >>>> become independent elements of the resulting PCollection, they don't retain >>>> any association/ordering. >>>> >>>> Multiply-triggered window are difficult to reason about (and not just >>>> in this case), https://s.apache.org/beam-sink-triggers is IMHO the >>>> right answer. >>>> >>> >>> I was reading this yesterday, but couldn't see how it solved the >>> out-of-order delivery problem here. I do like the overall direction its >>> proposing though, from my work with triggers so far I have found them very >>> difficult to reason about (like you said). >>> >> >> It moves the responsibility of doing things in the right order (and even >> defining what order is "correct enough") to the runner (and sinks) such >> that the side effects happen in order, even if all the processing did not. >> To be clear there's still a fair amount of design to make that doc into a >> workable system... >> > > With or without sink triggers, transforms that write need to be > pane-index-aware. The outputs themselves may be out of order, but they have > sequence numbers on them, so sinks likely need to be made stateful so they > can be idempotent in the face of reordering. > > Kenn > > >> >> >>> >>> >>>> >>>> >>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <[email protected]> >>>>>> wrote: >>>>>> > >>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying! >>>>>> Time to go re-think how we do some of our writes w/ early firings then. >>>>>> > >>>>>> > Are there any workarounds to make things happen in-order in >>>>>> dataflow? eg if the sink gets fused to the output of the GBK operation, >>>>>> will it always happen effectively in order (per key) even though it's >>>>>> not a >>>>>> guarantee? >>>>>> >>>>>> If things get fused, yes. Note that sinks themselves sometimes have >>>>>> fusion barriers though. >>>>>> >>>>>> > I also guess I could keep track of the last pane index my sink has >>>>>> seen, and ignore earlier ones (using state to keep track)? >>>>>> >>>>>> Yes, that would work. >>>>>> >>>>>> What kind of sink are you using? If it supports read-modify-write or >>>>>> some kind of transaction you may be able to mark early results as >>>>>> tentative >>>>>> (which would be useful anyway) and only overwrite tentative ones. >>>>>> >>>>>> >>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw < >>>>>> [email protected]> wrote: >>>>>> >> >>>>>> >> Correct, even within the same key there's no promise of event time >>>>>> ordering mapping of panes to real time ordering because the downstream >>>>>> operations may happen on a different machine. Multiply triggered windows >>>>>> add an element of non-determinism to the process. >>>>>> >> >>>>>> >> You're also correct that triggering with multiple panes requires >>>>>> lots of care, especially when it comes to operations with side effects >>>>>> (like sinks). Most safe is to only write the final pane to the sink, and >>>>>> handle early triggering in a different way. >>>>>> https://s.apache.org/beam-sink-triggers is a proposal to make this >>>>>> easier to reason about. >>>>>> >> >>>>>> >> >>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> >>>>>> wrote: >>>>>> >>> >>>>>> >>> Also to clarify here (I re-read this and realized it could be >>>>>> slightly unclear). My question is only about in-order delivery of panes. >>>>>> ie: will pane P always be delivered before P+1. >>>>>> >>> >>>>>> >>> I realize the use of "in-order" before could be confusing, I >>>>>> don't care about the ordering of the elements per-se, just the ordering >>>>>> of >>>>>> the pane delivery. >>>>>> >>> >>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0, >>>>>> P1, P2) for a key, a downstream PCollection could never see P0, P2, P1. >>>>>> OR >>>>>> at least, the final firing is always guaranteed to be delivered after all >>>>>> early-firings (eg we could have P0, P2, P1, but then always PLast). >>>>>> >>> >>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz < >>>>>> [email protected]> wrote: >>>>>> >>>> >>>>>> >>>> Are you also saying also that even in the first example (Source >>>>>> -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be >>>>>> delivered in-order from the Combine -> Sink transforms? This seems like >>>>>> a >>>>>> pretty big "got-cha" for correctness if you ever use accumulating >>>>>> triggering. >>>>>> >>>> >>>>>> >>>> I'd also like to point out I'm not talking about a global >>>>>> ordering across the entire PCollection, I'm talking about within the same >>>>>> key after a GBK transform. >>>>>> >>>> >>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw < >>>>>> [email protected]> wrote: >>>>>> >>>>> >>>>>> >>>>> Due to the nature of distributed processing, order is not >>>>>> preserved. You can, however, inspect the PaneInfo to determine if an >>>>>> element was early, on-time, or late and act accordingly. >>>>>> >>>>> >>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia < >>>>>> [email protected]> wrote: >>>>>> >>>>>> >>>>>> >>>>>> In my experience ordering is not guaranteed, you may need >>>>>> apply a transformation that sort the elements and then dispatch them >>>>>> sorted >>>>>> out. >>>>>> >>>>>> >>>>>> >>>>>> Or uses the Sorter extension for this: >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >>>>>> >>>>>> >>>>>> >>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. >>>>>> 2019, 16:31: >>>>>> >>>>>>> >>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how >>>>>> windowing, triggering, and panes work together, and how to ensure >>>>>> correctness throughout a pipeline. >>>>>> >>>>>>> >>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that >>>>>> looks like: >>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink >>>>>> >>>>>>> >>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, >>>>>> and accumulating panes, this is pretty straight forward. However, this >>>>>> can >>>>>> get more complicated if we add steps after the CombineByKey, for instance >>>>>> (using the same windowing strategy): >>>>>> >>>>>>> >>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into >>>>>> batches of N elements. I can do this with the built-in GroupIntoBatches >>>>>> [1] transform, now my pipeline looks like: >>>>>> >>>>>>> >>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink >>>>>> >>>>>>> >>>>>> >>>>>>> This leads to my main question: >>>>>> >>>>>>> Is ordering preserved somehow here? ie: is it possible that >>>>>> the result from early firing F+1 now comes BEFORE the firing F (because >>>>>> it >>>>>> was re-ordered in the GroupIntoBatches). This would mean that the sink >>>>>> then gets F+1 before F, which means my resulting store has incorrect data >>>>>> (possibly forever if F+1 was the final firing). >>>>>> >>>>>>> >>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to >>>>>> introduce my own ordering back in after GroupIntoBatches. GIB is an >>>>>> example here, but I imagine this could happen with any GBK type >>>>>> operation. >>>>>> >>>>>>> >>>>>> >>>>>>> Am I thinking about this the correct way? Thanks! >>>>>> >>>>>>> >>>>>> >>>>>>> [1] >>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html >>>>>> >>>>>>>
