On Wed, Feb 20, 2019 at 6:54 PM Raghu Angadi <[email protected]> wrote:
>
> On Tue, Feb 12, 2019 at 10:28 AM Robert Bradshaw <[email protected]> wrote:
>>
>> Correct, even within the same key there's no promise of event time ordering 
>> mapping of panes to real time ordering because the downstream operations may 
>> happen on a different machine. Multiply triggered windows add an element of 
>> non-determinism to the process.
>
> For clarification, the stage immediately after GBK itself processes fired 
> panes in order, correct? Of course, any more stages downstream of that may 
> see them out of order.

There is no such guarantee, but in runners that use the standard
group-also-by-windows libraries and do fusion this often happens to be
the case.

>> You're also correct that triggering with multiple panes requires lots of 
>> care, especially when it comes to operations with side effects (like sinks). 
>> Most safe is to only write the final pane to the sink, and handle early 
>> triggering in a different way. https://s.apache.org/beam-sink-triggers is a 
>> proposal to make this easier to reason about.
>>
>>
>> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <[email protected]> wrote:
>>>
>>> Also to clarify here (I re-read this and realized it could be slightly 
>>> unclear).  My question is only about in-order delivery of panes.  ie: will 
>>> pane P always be delivered before P+1.
>>>
>>> I realize the use of "in-order" before could be confusing, I don't care 
>>> about the ordering of the elements per-se, just the ordering of the pane 
>>> delivery.
>>>
>>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) for 
>>> a key, a downstream PCollection could never see P0, P2, P1.  OR at least, 
>>> the final firing is always guaranteed to be delivered after all 
>>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>
>>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <[email protected]> wrote:
>>>>
>>>> Are you also saying also that even in the first example (Source -> 
>>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be 
>>>> delivered in-order from the Combine -> Sink transforms?  This seems like a 
>>>> pretty big "got-cha" for correctness if you ever use accumulating 
>>>> triggering.
>>>>
>>>> I'd also like to point out I'm not talking about a global ordering across 
>>>> the entire PCollection, I'm talking about within the same key after a GBK 
>>>> transform.
>>>>
>>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <[email protected]> 
>>>> wrote:
>>>>>
>>>>> Due to the nature of distributed processing, order is not preserved. You 
>>>>> can, however, inspect the PaneInfo to determine if an element was early, 
>>>>> on-time, or late and act accordingly.
>>>>>
>>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <[email protected]> 
>>>>> wrote:
>>>>>>
>>>>>> In my experience ordering is not guaranteed, you may need apply a 
>>>>>> transformation that sort the elements and then dispatch them sorted out.
>>>>>>
>>>>>> Or uses the Sorter extension for this:
>>>>>>
>>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>>
>>>>>> Steve Niemitz <[email protected]> schrieb am Di., 12. Feb. 2019, 16:31:
>>>>>>>
>>>>>>> Hi everyone, I have some questions I want to ask about how windowing, 
>>>>>>> triggering, and panes work together, and how to ensure correctness 
>>>>>>> throughout a pipeline.
>>>>>>>
>>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>>
>>>>>>> Given fixed windows of 1 hour, early firings every minute, and 
>>>>>>> accumulating panes, this is pretty straight forward.  However, this can 
>>>>>>> get more complicated if we add steps after the CombineByKey, for 
>>>>>>> instance (using the same windowing strategy):
>>>>>>>
>>>>>>> Say I want to buffer the results of the CombineByKey into batches of N 
>>>>>>> elements.  I can do this with the built-in GroupIntoBatches [1] 
>>>>>>> transform, now my pipeline looks like:
>>>>>>>
>>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>>
>>>>>>> This leads to my main question:
>>>>>>> Is ordering preserved somehow here?  ie: is it possible that the result 
>>>>>>> from early firing F+1 now comes BEFORE the firing F (because it was 
>>>>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink 
>>>>>>> then gets F+1 before F, which means my resulting store has incorrect 
>>>>>>> data (possibly forever if F+1 was the final firing).
>>>>>>>
>>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my 
>>>>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, 
>>>>>>> but I imagine this could happen with any GBK type operation.
>>>>>>>
>>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>>
>>>>>>> [1] 
>>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html

Reply via email to