On 1/9/26 16:46, Kenneth Knowles wrote:
On Fri, Jan 9, 2026 at 4:07 AM Jan Lukavský <[email protected]> wrote:

    Is it possible to generate DoFnInvoker only during pipeline
    construction time? Flink generally uses approach that all user
    code is Serializable, the serialized bytes are distributed to
    workers and instances of UDFs are created only by deserialization.
    This would require DoFnInvoker(s) (and all related classes, like
    OnTimerInvoker) to be Serializable, but then this should work or
    is there anything special that I'm missing?

This seems like a generally good idea: The DoFn is a user-facing API while the DoFnInvoker is the worker/execution-facing API. So, aligning with this makes sense. That said, things largely work already and Byron's case just looks like a bug.
Agree.

The situation isn't comparable to the "old" DoFn or how Flink works because we are dynamically creating a class that is not present in the staged jars, not just shipping a serialized instance. So you'd be shipping up a Class object and loading it on the worker. This could be a fairly easy experiment to try (we may have tried it in the old days and decided it wasn't the easiest way).

Yes, it will probably require assembling all the DoFnInvokers to a separate jar and skipping it with the user code. I use this approach to run pipelines from a groovy REPL shell, seems to work fine.

 Jan


Kenn

    On 1/7/26 02:17, Reuven Lax via dev wrote:
    How would we do this? Caching a map of DoFn -> Invoker is
    effectively doing it once per DoFn, no?

    On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis
    <[email protected]> wrote:

        I was thinking more  in the sense of doing once per dofn
        rather than using a globally memoized cache

        On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]>
        wrote:

            Given how expensive it was in the past (multiple seconds
            to generate the bytecode!), I suspect we still don't want
            to do this on every element processing.

            On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis
            <[email protected]> wrote:

                I suspect the cache also needs to include the cast
                target (or perhaps a re-examination of "how expensive
                is this really in the year 2025?"). Fortunately
                there's a pretty easy workaround.

                On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles
                <[email protected]> wrote:

                    Generating the DoFnInvoker class takes enough
                    time that it is important to memoize them. The
                    cache is keyed on the DoFn class. See
                    
https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305


                    Kenn

                    On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis
                    <[email protected]> wrote:

                        I’ve replicated this on the non-Portable
                        FlinkRunner and DirectRunner.

                        On Dec 18, 2025, at 2:15 PM, Reuven Lax via
                        dev <[email protected]> wrote:

                        
                        Which runner are you using?

                        On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax
                        <[email protected]> wrote:

                            The bytecode generated in DoFnInvoker
                            (ByteBuddyDoFnInvokerFactory) does
                            generate casts to make sure that the
                            elements match. I'm not entirely sure
                            offhand why the same DoFnInvoker is
                            being used 0 seems like something might
                            be going wrong with DoFn caching.

                            Reuven

                            On Thu, Dec 18, 2025 at 10:19 AM Byron
                            Ellis <[email protected]> wrote:

                                Hi all,

                                I ran into sort of an interesting
                                issue last night. Consider the code
                                below. If you try to run it what
                                will happen is you'll get a
                                ClassCastException on the second
                                Filter.by. What appears to be
                                happening is that the Filter.by
                                DoFnInvoker is being reused... which
                                should be fine since that should be
                                working with Object... but what I
                                can't find is where the casting is
                                happening because it seems like a)
                                the cast isn't actually needed? and
                                b) it's doing the wrong cast. Any clues?

                                Best,
                                B

                                @Test public void testReusedLambda() {
                                   p.apply(Create.of(new SimpleElement1()))
                                       
.apply("First",Filter.by(Objects::nonNull))
                                       .apply(ParDo.of(new VerySimpleDoFn<>()))
                                       
.apply("Second",Filter.by(Objects::nonNull));
                                   p.run().waitUntilFinish();
                                }

                                static class SimpleElement1
                                implements Serializable {}

                                static class SimpleElement2
                                implements Serializable {}

                                static class VerySimpleDoFn<I> extends 
DoFn<I,SimpleElement2> {
                                   @ProcessElement public void
                                processElement(ProcessContext c) {
                                     c.output(new SimpleElement2());
                                   }
                                }



-- Byron Ellis ([email protected])
                "Oook" -- The Librarian



-- Byron Ellis ([email protected])
        "Oook" -- The Librarian

Reply via email to