On Wed, Sep 27, 2023 at 2:53 PM Robert Bradshaw via dev <dev@beam.apache.org>
wrote:

> On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev <dev@beam.apache.org>
> wrote:
>
>> DoFns are allowed to be non deterministic, so they don't have to yield
>> the "same" output.
>>
>
> Yeah. I'm more thinking here that there's a set of outputs that are
> considered equivalently valid.
>

exactly, "the same as much as the user expects it to be the same" :-)


>
>
>> The example I'm thinking of is where users perform some "best-effort"
>> deduplication by creating a hashmap in StartBundle and removing duplicates.
>> This is usually done purely for performance to reduce shuffle size, as
>> opposed to a guaranteed RemoveDuplicates. This scenario doesn't require
>> FinishBundle, though it does require a StartBundle.
>>
>
> This is a good example--the presence of Start *or* Finish is enough to
> indicate that the bundle outputs cannot be committed totally independently.
>

+1 to this example - it is a good example to demonstrate that @StartBundle
for communication with the runner is not redundant with lazy initialization.

Kenn


>
> On the other hand, if there's a Start but no Finish we could safely
> truncate (and retry) the outputs at any point and still get a
> valid-under-the-model result, which could play well with the checkpointing
> model of persistence. This could possibly allow for optimizations purely
> from static analysis of the DoFn.
>
>
>> On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>>
>>> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev <dev@beam.apache.org>
>>> wrote:
>>>
>>>> Yes, not including FinishBundle in ParDoPayload seems like a mistake.
>>>> Though absence of FinishBundle doesn't mean that one can assume that
>>>> elements in a bundle don't affect subsequent bundle elements (i.e. there
>>>> might still be caching!)
>>>>
>>>
>>> Well for a DoFn to be correct, it has to yield the same (or "the same as
>>> much as the user expects it to be the same") output regardless of order of
>>> processing or bundling so a runner or SDK harness can definitely take a
>>> bunch of elements and process them however it wants if there's
>>> no @FinishBundle. I think that's what Jan is getting at - adding
>>> a @FinishBundle is the user placing a new restriction on the runner.
>>> Technically probably have to include @StartBundle in that consideration.
>>>
>>> Kenn
>>>
>>>
>>>>
>>>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi Kenn and Reuven,
>>>>>>
>>>>>> I agree with all these points. The only issue here seems to be that
>>>>>> FlinkRunner does not fulfill these constraints. This is a bug that can be
>>>>>> fixed, though we need to change some defaults, as 1000 ms default bundle
>>>>>> "duration" for lower traffic Pipelines can be too much. We are also
>>>>>> probably missing some @ValidatesReunner tests for this. I created [1] and
>>>>>> [2] to track this.
>>>>>>
>>>>>> One question still remains, the bundle vs. element life-cycle is
>>>>>> relevant only for cases where processing of element X can affect 
>>>>>> processing
>>>>>> of element Y later in the same bundle. Once this influence is rules out
>>>>>> (i.e. no caching), this information can result in runner optimization 
>>>>>> that
>>>>>> yields better performance. Should we consider propagate this information
>>>>>> from user code to the runner?
>>>>>>
>>>>> Yes!
>>>>>
>>>>> This was the explicit goal of the move to annotation-driven DoFn in
>>>>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner
>>>>> can get good information about what the DoFn requirements are.
>>>>>
>>>>> When there is no @FinishBundle method, the runner can make additional
>>>>> optimizations. This should have been included in the ParDoPayload in the
>>>>> proto when we moved to portable pipelines. I cannot remember if there was 
>>>>> a
>>>>> good reason that we did not do so. Maybe we (incorrectly) thought that 
>>>>> this
>>>>> was an issue that only the Java SDK harness needed to know about.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>> [1] https://github.com/apache/beam/issues/28649
>>>>>>
>>>>>> [2] https://github.com/apache/beam/issues/28650
>>>>>> On 9/25/23 18:31, Reuven Lax via dev wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>>
>>>>>>> On 9/23/23 18:16, Reuven Lax via dev wrote:
>>>>>>>
>>>>>>> Two separate things here:
>>>>>>>
>>>>>>> 1. Yes, a watermark can update in the middle of a bundle.
>>>>>>> 2. The records in the bundle themselves will prevent the watermark
>>>>>>> from updating as they are still in flight until after finish bundle.
>>>>>>> Therefore simply caching the records should always be watermark safe,
>>>>>>> regardless of the runner. You will only run into problems if you try and
>>>>>>> move timestamps "backwards" - which is why Beam strongly discourages 
>>>>>>> this.
>>>>>>>
>>>>>>> This is not aligned with  FlinkRunner's implementation. And I
>>>>>>> actually think it is not aligned conceptually.  As mentioned, Flink does
>>>>>>> not have the concept of bundles at all. It achieves fault tolerance via
>>>>>>> checkpointing, essentially checkpoint barrier flowing from sources to
>>>>>>> sinks, safely snapshotting state of each operator on the way. Bundles 
>>>>>>> are
>>>>>>> implemented as a somewhat arbitrary set of elements between two 
>>>>>>> consecutive
>>>>>>> checkpoints (there can be multiple bundles between checkpoints). A 
>>>>>>> bundle
>>>>>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) 
>>>>>>> only
>>>>>>> after the checkpoint barrier passes over the elements in the bundle 
>>>>>>> (every
>>>>>>> bundle is finished at the very latest exactly before a checkpoint). But
>>>>>>> watermark propagation and bundle finalization is completely unrelated. 
>>>>>>> This
>>>>>>> might be a bug in the runner, but requiring checkpoint for watermark
>>>>>>> propagation will introduce insane delays between processing time and
>>>>>>> watermarks, every executable stage will delay watermark propagation 
>>>>>>> until a
>>>>>>> checkpoint (which is typically the order of seconds). This delay would 
>>>>>>> add
>>>>>>> up after each stage.
>>>>>>>
>>>>>>
>>>>>> It's not bundles that hold up processing, rather it is elements, and
>>>>>> elements are not considered "processed" until FinishBundle.
>>>>>>
>>>>>> You are right about Flink. In many cases this is fine - if Flink
>>>>>> rolls back to the last checkpoint, the watermark will also roll back, and
>>>>>> everything stays consistent. So in general, one does not need to wait for
>>>>>> checkpoints for watermark propagation.
>>>>>>
>>>>>> Where things get a bit weirder with Flink is whenever one has
>>>>>> external side effects. In theory, one should wait for checkpoints before
>>>>>> letting a Sink flush, otherwise one could end up with incorrect outputs
>>>>>> (especially with a sink like TextIO). Flink itself recognizes this, and
>>>>>> that's why they provide TwoPhaseCommitSinkFunction
>>>>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
>>>>>>  which
>>>>>> waits for a checkpoint. In Beam, this is the reason we introduced
>>>>>> RequiresStableInput. Of course in practice many Flink users don't do 
>>>>>> this -
>>>>>> in which case they are prioritizing latency over data correctness.
>>>>>>
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle
>>>>>>>> is committed, as there's no guarantee that this work won't be 
>>>>>>>> discarded.
>>>>>>>>
>>>>>>>> There was a thread [1], where the conclusion seemed to be that
>>>>>>>> updating watermark is possible even in the middle of a bundle. 
>>>>>>>> Actually,
>>>>>>>> handling watermarks is runner-dependent (e.g. Flink does not store
>>>>>>>> watermarks in checkpoints, they are always recomputed from scratch on
>>>>>>>> restore).
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>>>>>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>>>>>>>
>>>>>>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>>>>>>>
>>>>>>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <
>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> I've actually wondered about this specifically for streaming...
>>>>>>>>>> if you're writing a pipeline there it seems like you're often going 
>>>>>>>>>> to want
>>>>>>>>>> to put high fixed cost things like database connections even outside 
>>>>>>>>>> of the
>>>>>>>>>> bundle setup. You really only want to do that once in the lifetime 
>>>>>>>>>> of the
>>>>>>>>>> worker itself, not the bundle. Seems like having that boundary be 
>>>>>>>>>> somewhere
>>>>>>>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>>>>>>>> latency) group of elements might be more useful? I suppose this 
>>>>>>>>>> depends
>>>>>>>>>> heavily on the object lifecycle in the sdk worker though.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> +1. This is the difference between @Setup and @StartBundle. The
>>>>>>>>> start/finish bundle operations should be used for bracketing element
>>>>>>>>> processing that must be committed as a unit for correct failure 
>>>>>>>>> recovery
>>>>>>>>> (e.g. if elements are cached in ProcessElement, they should all be 
>>>>>>>>> emitted
>>>>>>>>> in FinishBundle). On the other hand, things like open database 
>>>>>>>>> connections
>>>>>>>>> can and likely should be shared across bundles.
>>>>>>>>>
>>>>>>>>> This is correct, but the caching between @StartBundle and
>>>>>>>>> @FinishBundle has some problems. First, users need to manually set
>>>>>>>>> watermark hold for min(timestamp in bundle), otherwise watermark might
>>>>>>>>> overtake the buffered elements.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>>>>>>>> committed, as there's no guarantee that this work won't be discarded.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Users don't have other option than using timer.withOutputTimestamp
>>>>>>>>> for that, as we don't have a user-facing API to set watermark hold
>>>>>>>>> otherwise, thus the in-bundle caching implies stateful DoFn. The 
>>>>>>>>> question
>>>>>>>>> might then by, why not use "classical" stateful caching involving 
>>>>>>>>> state, as
>>>>>>>>> there is full control over the caching in user code. This triggered 
>>>>>>>>> me an
>>>>>>>>> idea if it would be useful to add the information about caching to 
>>>>>>>>> the API
>>>>>>>>> (e.g. in Java @StartBundle(caching=true)), which could solve the above
>>>>>>>>> issues maybe (runner would know to set the hold, it could work with
>>>>>>>>> "stateless" DoFns)?
>>>>>>>>>
>>>>>>>>
>>>>>>>> Really, this is one of the areas that the streaming/batch
>>>>>>>> abstraction leaks. In batch it was a common pattern to have local DoFn
>>>>>>>> instance state that persisted from start to finish bundle, and these 
>>>>>>>> were
>>>>>>>> also used as convenient entry points for other operations (like opening
>>>>>>>> database connections) 'cause bundles were often "as large as possible."
>>>>>>>> WIth the advent of n streaming it makes sense to put this in
>>>>>>>> explicitly managed runner state to allow for cross-bundle amortization 
>>>>>>>> and
>>>>>>>> there's more value in distinguishing between @Setup and @
>>>>>>>> StartBundle.
>>>>>>>>
>>>>>>>> (Were I do to things over I'd probably encourage an API that
>>>>>>>> discouraged non-configuration instance state on DoFns altogether, e.g. 
>>>>>>>> in
>>>>>>>> the notion of Python context managers (and an equivalent API could 
>>>>>>>> probably
>>>>>>>> be put together with AutoClosables in Java) one would have something 
>>>>>>>> like
>>>>>>>>
>>>>>>>> ParDo(X)
>>>>>>>>
>>>>>>>> which would logically (though not necessarily physically) lead to
>>>>>>>> an execution like
>>>>>>>>
>>>>>>>> with X.bundle_processor() as bundle_processor:
>>>>>>>>   for bundle in bundles:
>>>>>>>>     with bundle_processor.element_processor() as process:
>>>>>>>>       for element in bundle:
>>>>>>>>         process(element)
>>>>>>>>
>>>>>>>> where the traditional setup/start_bundle/finish_bundle/teardown
>>>>>>>> logic would live in the __enter__ and __exit__ methods (made even 
>>>>>>>> easier
>>>>>>>> with coroutines.) For convenience one could of course provide a raw 
>>>>>>>> bundle
>>>>>>>> processor or element processor to ParDo if the enter/exit contexts are
>>>>>>>> trivial. But this is getting somewhat off-topic...
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> B
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> (I notice that you replied only to yourself, but there has been
>>>>>>>>>>> a whole thread of discussion on this - are you subscribed to 
>>>>>>>>>>> dev@beam?
>>>>>>>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd
>>>>>>>>>>> )
>>>>>>>>>>>
>>>>>>>>>>> It sounds like you want what everyone wants: to have the biggest
>>>>>>>>>>> bundles possible.
>>>>>>>>>>>
>>>>>>>>>>> So for bounded data, basically you make even splits of the data
>>>>>>>>>>> and each split is one bundle. And then dynamic splitting to 
>>>>>>>>>>> redistribute
>>>>>>>>>>> work to eliminate stragglers, if your engine has that capability.
>>>>>>>>>>>
>>>>>>>>>>> For unbounded data, you more-or-less bundle as much as you can
>>>>>>>>>>> without waiting too long, like Jan described.
>>>>>>>>>>>
>>>>>>>>>>> Users know to put their high fixed costs in @StartBundle and
>>>>>>>>>>> then it is the runner's job to put as many calls to @ProcessElement 
>>>>>>>>>>> as
>>>>>>>>>>> possible to amortize.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <
>>>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Whoops, I typoed my last email. I meant to write "this isn't
>>>>>>>>>>>> the greatest strategy for high *fixed* cost transforms", e.g.
>>>>>>>>>>>> a transform that takes 5 minutes to get set up and then maybe a 
>>>>>>>>>>>> microsecond
>>>>>>>>>>>> per input
>>>>>>>>>>>>
>>>>>>>>>>>> I suppose one solution is to move the responsibility for
>>>>>>>>>>>> handling this kind of situation to the user and expect users to 
>>>>>>>>>>>> use a
>>>>>>>>>>>> bundling transform (e.g. BatchElements [1]) followed by a
>>>>>>>>>>>> Reshuffle+FlatMap. Is this what other runners expect? Just want to 
>>>>>>>>>>>> make
>>>>>>>>>>>> sure I'm not missing some smart generic bundling strategy that 
>>>>>>>>>>>> might handle
>>>>>>>>>>>> this for users.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <
>>>>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Writing a runner and the first strategy for determining
>>>>>>>>>>>>> bundling size was to just start with a bundle size of one and 
>>>>>>>>>>>>> double it
>>>>>>>>>>>>> until we reach a size that we expect to take some targets 
>>>>>>>>>>>>> per-bundle
>>>>>>>>>>>>> runtime (e.g. maybe 10 minutes). I realize that this isn't the 
>>>>>>>>>>>>> greatest
>>>>>>>>>>>>> strategy for high sized cost transforms. I'm curious what kind of
>>>>>>>>>>>>> strategies other runners take?
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to