On Wed, Feb 20, 2019 at 6:54 PM Raghu Angadi <[email protected]> wrote: > > On Tue, Feb 12, 2019 at 10:28 AM Robert Bradshaw <[email protected]> wrote: >> >> Correct, even within the same key there's no promise of event time ordering >> mapping of panes to real time ordering because the downstream operations may >> happen on a different machine. Multiply triggered windows add an element of >> non-determinism to the process. > > For clarification, the stage immediately after GBK itself processes fired > panes in order, correct? Of course, any more stages downstream of that may > see them out of order.
There is no such guarantee, but in runners that use the standard group-also-by-windows libraries and do fusion this often happens to be the case. >> You're also correct that triggering with multiple panes requires lots of >> care, especially when it comes to operations with side effects (like sinks). >> Most safe is to only write the final pane to the sink, and handle early >> triggering in a different way. https://s.apache.org/beam-sink-triggers is a >> proposal to make this easier to reason about. >> >> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> wrote: >>> >>> Also to clarify here (I re-read this and realized it could be slightly >>> unclear). My question is only about in-order delivery of panes. ie: will >>> pane P always be delivered before P+1. >>> >>> I realize the use of "in-order" before could be confusing, I don't care >>> about the ordering of the elements per-se, just the ordering of the pane >>> delivery. >>> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) for >>> a key, a downstream PCollection could never see P0, P2, P1. OR at least, >>> the final firing is always guaranteed to be delivered after all >>> early-firings (eg we could have P0, P2, P1, but then always PLast). >>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <[email protected]> wrote: >>>> >>>> Are you also saying also that even in the first example (Source -> >>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be >>>> delivered in-order from the Combine -> Sink transforms? This seems like a >>>> pretty big "got-cha" for correctness if you ever use accumulating >>>> triggering. >>>> >>>> I'd also like to point out I'm not talking about a global ordering across >>>> the entire PCollection, I'm talking about within the same key after a GBK >>>> transform. >>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <[email protected]> >>>> wrote: >>>>> >>>>> Due to the nature of distributed processing, order is not preserved. You >>>>> can, however, inspect the PaneInfo to determine if an element was early, >>>>> on-time, or late and act accordingly. >>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <[email protected]> >>>>> wrote: >>>>>> >>>>>> In my experience ordering is not guaranteed, you may need apply a >>>>>> transformation that sort the elements and then dispatch them sorted out. >>>>>> >>>>>> Or uses the Sorter extension for this: >>>>>> >>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >>>>>> >>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. 2019, 16:31: >>>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how windowing, >>>>>>> triggering, and panes work together, and how to ensure correctness >>>>>>> throughout a pipeline. >>>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks like: >>>>>>> Source -> CombineByKey (Sum) -> Sink >>>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and >>>>>>> accumulating panes, this is pretty straight forward. However, this can >>>>>>> get more complicated if we add steps after the CombineByKey, for >>>>>>> instance (using the same windowing strategy): >>>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into batches of N >>>>>>> elements. I can do this with the built-in GroupIntoBatches [1] >>>>>>> transform, now my pipeline looks like: >>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink >>>>>>> >>>>>>> This leads to my main question: >>>>>>> Is ordering preserved somehow here? ie: is it possible that the result >>>>>>> from early firing F+1 now comes BEFORE the firing F (because it was >>>>>>> re-ordered in the GroupIntoBatches). This would mean that the sink >>>>>>> then gets F+1 before F, which means my resulting store has incorrect >>>>>>> data (possibly forever if F+1 was the final firing). >>>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my >>>>>>> own ordering back in after GroupIntoBatches. GIB is an example here, >>>>>>> but I imagine this could happen with any GBK type operation. >>>>>>> >>>>>>> Am I thinking about this the correct way? Thanks! >>>>>>> >>>>>>> [1] >>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
