On the performance aspects of this, I did some investigation back in 2022. The code generation is an improvement because various things we do (mostly deep stacks and virtual function calls) result in the JIT being disabled. This was a problem with the default Java 8 config, it might not be a problem with current Java. Of course, on many real pipelines IO latency dominates everything so it doesn't matter.
Writeup from 2022: https://docs.google.com/document/d/12XkHLcE0HpOS0fs0FekDzh68fMPCEZ5uGCh00kPZf0I/edit On Fri, Jan 9, 2026 at 10:10 AM Robert Bradshaw via dev <[email protected]> wrote: > On Fri, Jan 9, 2026 at 10:05 AM Byron Ellis <[email protected]> wrote: > >> Our best bet is probably to add the class being cast to as part of the >> cache key. Presumably you know that as the time of caching because you were >> able to generate that bytecode. Though… given that this is Java and runtime >> do we even need that cast? >> > > +1. This seems the right solution. In essence, we're trying to memoize the > bytecode generation, but not keying the cache on the full set of pertinent > input parameters, which is inherently wrong. > > If that's not available, the stage name would probably be sufficient as > well. > > >> >> On Jan 9, 2026, at 9:48 AM, Jan Lukavský <[email protected]> wrote: >> >> >> On 1/9/26 16:46, Kenneth Knowles wrote: >> >> On Fri, Jan 9, 2026 at 4:07 AM Jan Lukavský <[email protected]> wrote: >> >>> Is it possible to generate DoFnInvoker only during pipeline construction >>> time? Flink generally uses approach that all user code is Serializable, the >>> serialized bytes are distributed to workers and instances of UDFs are >>> created only by deserialization. This would require DoFnInvoker(s) (and all >>> related classes, like OnTimerInvoker) to be Serializable, but then this >>> should work or is there anything special that I'm missing? >>> >> This seems like a generally good idea: The DoFn is a user-facing API >> while the DoFnInvoker is the worker/execution-facing API. So, aligning with >> this makes sense. That said, things largely work already and Byron's case >> just looks like a bug. >> >> Agree. >> >> >> The situation isn't comparable to the "old" DoFn or how Flink works >> because we are dynamically creating a class that is not present in the >> staged jars, not just shipping a serialized instance. So you'd be shipping >> up a Class object and loading it on the worker. This could be a fairly easy >> experiment to try (we may have tried it in the old days and decided it >> wasn't the easiest way). >> >> Yes, it will probably require assembling all the DoFnInvokers to a >> separate jar and skipping it with the user code. I use this approach to run >> pipelines from a groovy REPL shell, seems to work fine. >> >> Jan >> >> >> Kenn >> >> >> >>> On 1/7/26 02:17, Reuven Lax via dev wrote: >>> >>> How would we do this? Caching a map of DoFn -> Invoker is effectively >>> doing it once per DoFn, no? >>> >>> On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis <[email protected]> >>> wrote: >>> >>>> I was thinking more in the sense of doing once per dofn rather than >>>> using a globally memoized cache >>>> >>>> On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Given how expensive it was in the past (multiple seconds to generate >>>>> the bytecode!), I suspect we still don't want to do this on every >>>>> element processing. >>>>> >>>>> On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis <[email protected]> >>>>> wrote: >>>>> >>>>>> I suspect the cache also needs to include the cast target (or perhaps >>>>>> a re-examination of "how expensive is this really in the year 2025?"). >>>>>> Fortunately there's a pretty easy workaround. >>>>>> >>>>>> On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Generating the DoFnInvoker class takes enough time that it is >>>>>>> important to memoize them. The cache is keyed on the DoFn class. See >>>>>>> https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305 >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I’ve replicated this on the non-Portable FlinkRunner and >>>>>>>> DirectRunner. >>>>>>>> >>>>>>>> On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> >>>>>>>> Which runner are you using? >>>>>>>> >>>>>>>> On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The bytecode generated in DoFnInvoker >>>>>>>>> (ByteBuddyDoFnInvokerFactory) does generate casts to make sure that >>>>>>>>> the >>>>>>>>> elements match. I'm not entirely sure offhand why the same >>>>>>>>> DoFnInvoker is >>>>>>>>> being used 0 seems like something might be going wrong with DoFn >>>>>>>>> caching. >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> I ran into sort of an interesting issue last night. Consider the >>>>>>>>>> code below. If you try to run it what will happen is you'll get a >>>>>>>>>> ClassCastException on the second Filter.by. What appears to be >>>>>>>>>> happening is >>>>>>>>>> that the Filter.by DoFnInvoker is being reused... which should be >>>>>>>>>> fine >>>>>>>>>> since that should be working with Object... but what I can't find is >>>>>>>>>> where >>>>>>>>>> the casting is happening because it seems like a) the cast isn't >>>>>>>>>> actually >>>>>>>>>> needed? and b) it's doing the wrong cast. Any clues? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> B >>>>>>>>>> >>>>>>>>>> @Testpublic void testReusedLambda() { >>>>>>>>>> p.apply(Create.of(new SimpleElement1())) >>>>>>>>>> .apply("First", Filter.by(Objects::nonNull)) >>>>>>>>>> .apply(ParDo.of(new VerySimpleDoFn<>())) >>>>>>>>>> .apply("Second", Filter.by(Objects::nonNull)); >>>>>>>>>> p.run().waitUntilFinish(); >>>>>>>>>> } >>>>>>>>>> static class SimpleElement1 implements Serializable {} >>>>>>>>>> static class SimpleElement2 implements Serializable {} >>>>>>>>>> static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> { >>>>>>>>>> @ProcessElement public void processElement(ProcessContext c) { >>>>>>>>>> c.output(new SimpleElement2()); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>> >>>>>> -- >>>>>> Byron Ellis ([email protected]) >>>>>> "Oook" -- The Librarian >>>>>> >>>>> >>>> >>>> -- >>>> Byron Ellis ([email protected]) >>>> "Oook" -- The Librarian >>>> >>>
