Luke & I talked in person a bit. I want to give context for what is at
stake here, in terms of side inputs in portability. A decent starting place
is https://s.apache.org/beam-side-inputs-1-pager

In that general design, the runner offers the SDK just one (or a few)
materialization strategies, and the SDK builds idiomatic structures on top
of it. Concretely, the Fn API today offers a multimap structure, and the
idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
runner to materialize. As a naive example, a simple iterable structure
could just map all elements to one dummy key in the multimap. But if you
wanted a list plus its length, then you might map all elements to an
element key and the length to a special length meta-key.

So there is a problem: if the SDK is outputting a new KV<"elements-key",
...> and KV<"length-key", ...> for the runner to materialize then consumers
of the side input need to see both updates to the materialization or
neither. In general, these outputs might span many keys.

It seems like there are a few ways to resolve this tension:

 - Establish a consistency model so these updates will be observed
together. Seems hard and whatever we come up with will limit runners, limit
efficiency, and potentially leak into users having to reason about
concurrency

 - Instead of building the variety of side input views on one primitive
multimap materialization, force runners to provide many primitive
materializations with consistency under the hood. Not hard to get started,
but adds an unfortunate new dimension for runners to vary in functionality
and performance, versus letting them optimize just one or a few
materializations

 - Have no consistency and just not support side input methods that would
require consistent metadata. I'm curious what features this will hurt.

 - Have no consistency but require the SDK to build some sort of large
value since single-element consistency is built in to the model always.
Today many runners do concatenate all elements into one value, though that
does not perform well. Making this effective probably requires new model
features.

Kenn

On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:

> One thing to keep in mind: triggers that fire multiple times per window
> already tend to be non deterministic. These are element-count or
> processing-time triggers, both of which are fairly non deterministic in
> firing.
>
> Reuven
>
> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Today, we define that a side input becomes available to be consumed once
>> at least one firing occurs or when the runner detects that no such output
>> could be produced (e.g. watermark is beyond the end of the window when
>> using the default trigger). For triggers that fire at most once, consumers
>> are guaranteed to have a consistent view of the contents of the side input.
>> But what happens when the trigger fire multiple times?
>>
>> Lets say we have a pipeline containing:
>> ParDo(A) --> PCollectionView S
>>          \-> PCollectionView T
>>
>>   ...
>>    |
>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>    |
>>   ...
>>
>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>> can also see Y (and vice versa)?
>>
>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>> to see X only if it can also see Y?
>>
>

Reply via email to