On Fri, Jan 9, 2026 at 4:07 AM Jan Lukavský <[email protected]> wrote:
> Is it possible to generate DoFnInvoker only during pipeline construction > time? Flink generally uses approach that all user code is Serializable, the > serialized bytes are distributed to workers and instances of UDFs are > created only by deserialization. This would require DoFnInvoker(s) (and all > related classes, like OnTimerInvoker) to be Serializable, but then this > should work or is there anything special that I'm missing? > This seems like a generally good idea: The DoFn is a user-facing API while the DoFnInvoker is the worker/execution-facing API. So, aligning with this makes sense. That said, things largely work already and Byron's case just looks like a bug. The situation isn't comparable to the "old" DoFn or how Flink works because we are dynamically creating a class that is not present in the staged jars, not just shipping a serialized instance. So you'd be shipping up a Class object and loading it on the worker. This could be a fairly easy experiment to try (we may have tried it in the old days and decided it wasn't the easiest way). Kenn > On 1/7/26 02:17, Reuven Lax via dev wrote: > > How would we do this? Caching a map of DoFn -> Invoker is effectively > doing it once per DoFn, no? > > On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis <[email protected]> wrote: > >> I was thinking more in the sense of doing once per dofn rather than >> using a globally memoized cache >> >> On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]> wrote: >> >>> Given how expensive it was in the past (multiple seconds to generate the >>> bytecode!), I suspect we still don't want to do this on every >>> element processing. >>> >>> On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis <[email protected]> >>> wrote: >>> >>>> I suspect the cache also needs to include the cast target (or perhaps a >>>> re-examination of "how expensive is this really in the year 2025?"). >>>> Fortunately there's a pretty easy workaround. >>>> >>>> On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles <[email protected]> wrote: >>>> >>>>> Generating the DoFnInvoker class takes enough time that it is >>>>> important to memoize them. The cache is keyed on the DoFn class. See >>>>> https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305 >>>>> >>>>> Kenn >>>>> >>>>> On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]> >>>>> wrote: >>>>> >>>>>> I’ve replicated this on the non-Portable FlinkRunner and >>>>>> DirectRunner. >>>>>> >>>>>> On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]> >>>>>> wrote: >>>>>> >>>>>> >>>>>> Which runner are you using? >>>>>> >>>>>> On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) >>>>>>> does generate casts to make sure that the elements match. I'm not >>>>>>> entirely >>>>>>> sure offhand why the same DoFnInvoker is being used 0 seems like >>>>>>> something >>>>>>> might be going wrong with DoFn caching. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I ran into sort of an interesting issue last night. Consider the >>>>>>>> code below. If you try to run it what will happen is you'll get a >>>>>>>> ClassCastException on the second Filter.by. What appears to be >>>>>>>> happening is >>>>>>>> that the Filter.by DoFnInvoker is being reused... which should be fine >>>>>>>> since that should be working with Object... but what I can't find is >>>>>>>> where >>>>>>>> the casting is happening because it seems like a) the cast isn't >>>>>>>> actually >>>>>>>> needed? and b) it's doing the wrong cast. Any clues? >>>>>>>> >>>>>>>> Best, >>>>>>>> B >>>>>>>> >>>>>>>> @Testpublic void testReusedLambda() { >>>>>>>> p.apply(Create.of(new SimpleElement1())) >>>>>>>> .apply("First", Filter.by(Objects::nonNull)) >>>>>>>> .apply(ParDo.of(new VerySimpleDoFn<>())) >>>>>>>> .apply("Second", Filter.by(Objects::nonNull)); >>>>>>>> p.run().waitUntilFinish(); >>>>>>>> } >>>>>>>> static class SimpleElement1 implements Serializable {} >>>>>>>> static class SimpleElement2 implements Serializable {} >>>>>>>> static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> { >>>>>>>> @ProcessElement public void processElement(ProcessContext c) { >>>>>>>> c.output(new SimpleElement2()); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>> >>>> -- >>>> Byron Ellis ([email protected]) >>>> "Oook" -- The Librarian >>>> >>> >> >> -- >> Byron Ellis ([email protected]) >> "Oook" -- The Librarian >> >
