On Wed, Sep 27, 2023 at 2:53 PM Robert Bradshaw via dev <dev@beam.apache.org> wrote:
> On Wed, Sep 27, 2023 at 10:58 AM Reuven Lax via dev <dev@beam.apache.org> > wrote: > >> DoFns are allowed to be non deterministic, so they don't have to yield >> the "same" output. >> > > Yeah. I'm more thinking here that there's a set of outputs that are > considered equivalently valid. > exactly, "the same as much as the user expects it to be the same" :-) > > >> The example I'm thinking of is where users perform some "best-effort" >> deduplication by creating a hashmap in StartBundle and removing duplicates. >> This is usually done purely for performance to reduce shuffle size, as >> opposed to a guaranteed RemoveDuplicates. This scenario doesn't require >> FinishBundle, though it does require a StartBundle. >> > > This is a good example--the presence of Start *or* Finish is enough to > indicate that the bundle outputs cannot be committed totally independently. > +1 to this example - it is a good example to demonstrate that @StartBundle for communication with the runner is not redundant with lazy initialization. Kenn > > On the other hand, if there's a Start but no Finish we could safely > truncate (and retry) the outputs at any point and still get a > valid-under-the-model result, which could play well with the checkpointing > model of persistence. This could possibly allow for optimizations purely > from static analysis of the DoFn. > > >> On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> >>> On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev <dev@beam.apache.org> >>> wrote: >>> >>>> Yes, not including FinishBundle in ParDoPayload seems like a mistake. >>>> Though absence of FinishBundle doesn't mean that one can assume that >>>> elements in a bundle don't affect subsequent bundle elements (i.e. there >>>> might still be caching!) >>>> >>> >>> Well for a DoFn to be correct, it has to yield the same (or "the same as >>> much as the user expects it to be the same") output regardless of order of >>> processing or bundling so a runner or SDK harness can definitely take a >>> bunch of elements and process them however it wants if there's >>> no @FinishBundle. I think that's what Jan is getting at - adding >>> a @FinishBundle is the user placing a new restriction on the runner. >>> Technically probably have to include @StartBundle in that consideration. >>> >>> Kenn >>> >>> >>>> >>>> On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> Hi Kenn and Reuven, >>>>>> >>>>>> I agree with all these points. The only issue here seems to be that >>>>>> FlinkRunner does not fulfill these constraints. This is a bug that can be >>>>>> fixed, though we need to change some defaults, as 1000 ms default bundle >>>>>> "duration" for lower traffic Pipelines can be too much. We are also >>>>>> probably missing some @ValidatesReunner tests for this. I created [1] and >>>>>> [2] to track this. >>>>>> >>>>>> One question still remains, the bundle vs. element life-cycle is >>>>>> relevant only for cases where processing of element X can affect >>>>>> processing >>>>>> of element Y later in the same bundle. Once this influence is rules out >>>>>> (i.e. no caching), this information can result in runner optimization >>>>>> that >>>>>> yields better performance. Should we consider propagate this information >>>>>> from user code to the runner? >>>>>> >>>>> Yes! >>>>> >>>>> This was the explicit goal of the move to annotation-driven DoFn in >>>>> https://s.apache.org/a-new-dofn to make it so that the SDK and runner >>>>> can get good information about what the DoFn requirements are. >>>>> >>>>> When there is no @FinishBundle method, the runner can make additional >>>>> optimizations. This should have been included in the ParDoPayload in the >>>>> proto when we moved to portable pipelines. I cannot remember if there was >>>>> a >>>>> good reason that we did not do so. Maybe we (incorrectly) thought that >>>>> this >>>>> was an issue that only the Java SDK harness needed to know about. >>>>> >>>>> Kenn >>>>> >>>>> >>>>>> [1] https://github.com/apache/beam/issues/28649 >>>>>> >>>>>> [2] https://github.com/apache/beam/issues/28650 >>>>>> On 9/25/23 18:31, Reuven Lax via dev wrote: >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>>> >>>>>>> >>>>>>> On 9/23/23 18:16, Reuven Lax via dev wrote: >>>>>>> >>>>>>> Two separate things here: >>>>>>> >>>>>>> 1. Yes, a watermark can update in the middle of a bundle. >>>>>>> 2. The records in the bundle themselves will prevent the watermark >>>>>>> from updating as they are still in flight until after finish bundle. >>>>>>> Therefore simply caching the records should always be watermark safe, >>>>>>> regardless of the runner. You will only run into problems if you try and >>>>>>> move timestamps "backwards" - which is why Beam strongly discourages >>>>>>> this. >>>>>>> >>>>>>> This is not aligned with FlinkRunner's implementation. And I >>>>>>> actually think it is not aligned conceptually. As mentioned, Flink does >>>>>>> not have the concept of bundles at all. It achieves fault tolerance via >>>>>>> checkpointing, essentially checkpoint barrier flowing from sources to >>>>>>> sinks, safely snapshotting state of each operator on the way. Bundles >>>>>>> are >>>>>>> implemented as a somewhat arbitrary set of elements between two >>>>>>> consecutive >>>>>>> checkpoints (there can be multiple bundles between checkpoints). A >>>>>>> bundle >>>>>>> is 'committed' (i.e. persistently stored and guaranteed not to retry) >>>>>>> only >>>>>>> after the checkpoint barrier passes over the elements in the bundle >>>>>>> (every >>>>>>> bundle is finished at the very latest exactly before a checkpoint). But >>>>>>> watermark propagation and bundle finalization is completely unrelated. >>>>>>> This >>>>>>> might be a bug in the runner, but requiring checkpoint for watermark >>>>>>> propagation will introduce insane delays between processing time and >>>>>>> watermarks, every executable stage will delay watermark propagation >>>>>>> until a >>>>>>> checkpoint (which is typically the order of seconds). This delay would >>>>>>> add >>>>>>> up after each stage. >>>>>>> >>>>>> >>>>>> It's not bundles that hold up processing, rather it is elements, and >>>>>> elements are not considered "processed" until FinishBundle. >>>>>> >>>>>> You are right about Flink. In many cases this is fine - if Flink >>>>>> rolls back to the last checkpoint, the watermark will also roll back, and >>>>>> everything stays consistent. So in general, one does not need to wait for >>>>>> checkpoints for watermark propagation. >>>>>> >>>>>> Where things get a bit weirder with Flink is whenever one has >>>>>> external side effects. In theory, one should wait for checkpoints before >>>>>> letting a Sink flush, otherwise one could end up with incorrect outputs >>>>>> (especially with a sink like TextIO). Flink itself recognizes this, and >>>>>> that's why they provide TwoPhaseCommitSinkFunction >>>>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html> >>>>>> which >>>>>> waits for a checkpoint. In Beam, this is the reason we introduced >>>>>> RequiresStableInput. Of course in practice many Flink users don't do >>>>>> this - >>>>>> in which case they are prioritizing latency over data correctness. >>>>>> >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> >>>>>>> wrote: >>>>>>> >>>>>>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle >>>>>>>> is committed, as there's no guarantee that this work won't be >>>>>>>> discarded. >>>>>>>> >>>>>>>> There was a thread [1], where the conclusion seemed to be that >>>>>>>> updating watermark is possible even in the middle of a bundle. >>>>>>>> Actually, >>>>>>>> handling watermarks is runner-dependent (e.g. Flink does not store >>>>>>>> watermarks in checkpoints, they are always recomputed from scratch on >>>>>>>> restore). >>>>>>>> >>>>>>>> [1] >>>>>>>> https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv >>>>>>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote: >>>>>>>> >>>>>>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote: >>>>>>>>> >>>>>>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev < >>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> I've actually wondered about this specifically for streaming... >>>>>>>>>> if you're writing a pipeline there it seems like you're often going >>>>>>>>>> to want >>>>>>>>>> to put high fixed cost things like database connections even outside >>>>>>>>>> of the >>>>>>>>>> bundle setup. You really only want to do that once in the lifetime >>>>>>>>>> of the >>>>>>>>>> worker itself, not the bundle. Seems like having that boundary be >>>>>>>>>> somewhere >>>>>>>>>> other than an arbitrarily (and probably small in streaming to avoid >>>>>>>>>> latency) group of elements might be more useful? I suppose this >>>>>>>>>> depends >>>>>>>>>> heavily on the object lifecycle in the sdk worker though. >>>>>>>>>> >>>>>>>>> >>>>>>>>> +1. This is the difference between @Setup and @StartBundle. The >>>>>>>>> start/finish bundle operations should be used for bracketing element >>>>>>>>> processing that must be committed as a unit for correct failure >>>>>>>>> recovery >>>>>>>>> (e.g. if elements are cached in ProcessElement, they should all be >>>>>>>>> emitted >>>>>>>>> in FinishBundle). On the other hand, things like open database >>>>>>>>> connections >>>>>>>>> can and likely should be shared across bundles. >>>>>>>>> >>>>>>>>> This is correct, but the caching between @StartBundle and >>>>>>>>> @FinishBundle has some problems. First, users need to manually set >>>>>>>>> watermark hold for min(timestamp in bundle), otherwise watermark might >>>>>>>>> overtake the buffered elements. >>>>>>>>> >>>>>>>> >>>>>>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is >>>>>>>> committed, as there's no guarantee that this work won't be discarded. >>>>>>>> >>>>>>>> >>>>>>>>> Users don't have other option than using timer.withOutputTimestamp >>>>>>>>> for that, as we don't have a user-facing API to set watermark hold >>>>>>>>> otherwise, thus the in-bundle caching implies stateful DoFn. The >>>>>>>>> question >>>>>>>>> might then by, why not use "classical" stateful caching involving >>>>>>>>> state, as >>>>>>>>> there is full control over the caching in user code. This triggered >>>>>>>>> me an >>>>>>>>> idea if it would be useful to add the information about caching to >>>>>>>>> the API >>>>>>>>> (e.g. in Java @StartBundle(caching=true)), which could solve the above >>>>>>>>> issues maybe (runner would know to set the hold, it could work with >>>>>>>>> "stateless" DoFns)? >>>>>>>>> >>>>>>>> >>>>>>>> Really, this is one of the areas that the streaming/batch >>>>>>>> abstraction leaks. In batch it was a common pattern to have local DoFn >>>>>>>> instance state that persisted from start to finish bundle, and these >>>>>>>> were >>>>>>>> also used as convenient entry points for other operations (like opening >>>>>>>> database connections) 'cause bundles were often "as large as possible." >>>>>>>> WIth the advent of n streaming it makes sense to put this in >>>>>>>> explicitly managed runner state to allow for cross-bundle amortization >>>>>>>> and >>>>>>>> there's more value in distinguishing between @Setup and @ >>>>>>>> StartBundle. >>>>>>>> >>>>>>>> (Were I do to things over I'd probably encourage an API that >>>>>>>> discouraged non-configuration instance state on DoFns altogether, e.g. >>>>>>>> in >>>>>>>> the notion of Python context managers (and an equivalent API could >>>>>>>> probably >>>>>>>> be put together with AutoClosables in Java) one would have something >>>>>>>> like >>>>>>>> >>>>>>>> ParDo(X) >>>>>>>> >>>>>>>> which would logically (though not necessarily physically) lead to >>>>>>>> an execution like >>>>>>>> >>>>>>>> with X.bundle_processor() as bundle_processor: >>>>>>>> for bundle in bundles: >>>>>>>> with bundle_processor.element_processor() as process: >>>>>>>> for element in bundle: >>>>>>>> process(element) >>>>>>>> >>>>>>>> where the traditional setup/start_bundle/finish_bundle/teardown >>>>>>>> logic would live in the __enter__ and __exit__ methods (made even >>>>>>>> easier >>>>>>>> with coroutines.) For convenience one could of course provide a raw >>>>>>>> bundle >>>>>>>> processor or element processor to ParDo if the enter/exit contexts are >>>>>>>> trivial. But this is getting somewhat off-topic... >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> B >>>>>>>>>> >>>>>>>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> (I notice that you replied only to yourself, but there has been >>>>>>>>>>> a whole thread of discussion on this - are you subscribed to >>>>>>>>>>> dev@beam? >>>>>>>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd >>>>>>>>>>> ) >>>>>>>>>>> >>>>>>>>>>> It sounds like you want what everyone wants: to have the biggest >>>>>>>>>>> bundles possible. >>>>>>>>>>> >>>>>>>>>>> So for bounded data, basically you make even splits of the data >>>>>>>>>>> and each split is one bundle. And then dynamic splitting to >>>>>>>>>>> redistribute >>>>>>>>>>> work to eliminate stragglers, if your engine has that capability. >>>>>>>>>>> >>>>>>>>>>> For unbounded data, you more-or-less bundle as much as you can >>>>>>>>>>> without waiting too long, like Jan described. >>>>>>>>>>> >>>>>>>>>>> Users know to put their high fixed costs in @StartBundle and >>>>>>>>>>> then it is the runner's job to put as many calls to @ProcessElement >>>>>>>>>>> as >>>>>>>>>>> possible to amortize. >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran < >>>>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Whoops, I typoed my last email. I meant to write "this isn't >>>>>>>>>>>> the greatest strategy for high *fixed* cost transforms", e.g. >>>>>>>>>>>> a transform that takes 5 minutes to get set up and then maybe a >>>>>>>>>>>> microsecond >>>>>>>>>>>> per input >>>>>>>>>>>> >>>>>>>>>>>> I suppose one solution is to move the responsibility for >>>>>>>>>>>> handling this kind of situation to the user and expect users to >>>>>>>>>>>> use a >>>>>>>>>>>> bundling transform (e.g. BatchElements [1]) followed by a >>>>>>>>>>>> Reshuffle+FlatMap. Is this what other runners expect? Just want to >>>>>>>>>>>> make >>>>>>>>>>>> sure I'm not missing some smart generic bundling strategy that >>>>>>>>>>>> might handle >>>>>>>>>>>> this for users. >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran < >>>>>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Writing a runner and the first strategy for determining >>>>>>>>>>>>> bundling size was to just start with a bundle size of one and >>>>>>>>>>>>> double it >>>>>>>>>>>>> until we reach a size that we expect to take some targets >>>>>>>>>>>>> per-bundle >>>>>>>>>>>>> runtime (e.g. maybe 10 minutes). I realize that this isn't the >>>>>>>>>>>>> greatest >>>>>>>>>>>>> strategy for high sized cost transforms. I'm curious what kind of >>>>>>>>>>>>> strategies other runners take? >>>>>>>>>>>>> >>>>>>>>>>>>