Is it possible to generate DoFnInvoker only during pipeline construction time? Flink generally uses approach that all user code is Serializable, the serialized bytes are distributed to workers and instances of UDFs are created only by deserialization. This would require DoFnInvoker(s) (and all related classes, like OnTimerInvoker) to be Serializable, but then this should work or is there anything special that I'm missing?

On 1/7/26 02:17, Reuven Lax via dev wrote:
How would we do this? Caching a map of DoFn -> Invoker is effectively doing it once per DoFn, no?

On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis <[email protected]> wrote:

    I was thinking more  in the sense of doing once per dofn rather
    than using a globally memoized cache

    On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]> wrote:

        Given how expensive it was in the past (multiple seconds to
        generate the bytecode!), I suspect we still don't want to do
        this on every element processing.

        On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis
        <[email protected]> wrote:

            I suspect the cache also needs to include the cast target
            (or perhaps a re-examination of "how expensive is this
            really in the year 2025?"). Fortunately there's a pretty
            easy workaround.

            On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles
            <[email protected]> wrote:

                Generating the DoFnInvoker class takes enough time
                that it is important to memoize them. The cache is
                keyed on the DoFn class. See
                
https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305


                Kenn

                On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis
                <[email protected]> wrote:

                    I’ve replicated this on the non-Portable
                    FlinkRunner and DirectRunner.

                    On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev
                    <[email protected]> wrote:

                    
                    Which runner are you using?

                    On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax
                    <[email protected]> wrote:

                        The bytecode generated in DoFnInvoker
                        (ByteBuddyDoFnInvokerFactory) does generate
                        casts to make sure that the elements match.
                        I'm not entirely sure offhand why the same
                        DoFnInvoker is being used 0 seems like
                        something might be going wrong with DoFn
                        caching.

                        Reuven

                        On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis
                        <[email protected]> wrote:

                            Hi all,

                            I ran into sort of an interesting issue
                            last night. Consider the code below. If
                            you try to run it what will happen is
                            you'll get a ClassCastException on the
                            second Filter.by. What appears to be
                            happening is that the Filter.by
                            DoFnInvoker is being reused... which
                            should be fine since that should be
                            working with Object... but what I can't
                            find is where the casting is happening
                            because it seems like a) the cast isn't
                            actually needed? and b) it's doing the
                            wrong cast. Any clues?

                            Best,
                            B

                            @Test public void testReusedLambda() {
                               p.apply(Create.of(new SimpleElement1()))
                                   .apply("First",Filter.by(Objects::nonNull))
                                   .apply(ParDo.of(new VerySimpleDoFn<>()))
                                   .apply("Second",Filter.by(Objects::nonNull));
                               p.run().waitUntilFinish();
                            }

                            static class SimpleElement1 implements
                            Serializable {}

                            static class SimpleElement2 implements
                            Serializable {}

                            static class VerySimpleDoFn<I> extends 
DoFn<I,SimpleElement2> {
                               @ProcessElement public void 
processElement(ProcessContext c) {
                                 c.output(new SimpleElement2());
                               }
                            }



-- Byron Ellis ([email protected])
            "Oook" -- The Librarian



-- Byron Ellis ([email protected])
    "Oook" -- The Librarian

Reply via email to