On Fri, Jan 9, 2026 at 4:07 AM Jan Lukavský <[email protected]> wrote:

> Is it possible to generate DoFnInvoker only during pipeline construction
> time? Flink generally uses approach that all user code is Serializable, the
> serialized bytes are distributed to workers and instances of UDFs are
> created only by deserialization. This would require DoFnInvoker(s) (and all
> related classes, like OnTimerInvoker) to be Serializable, but then this
> should work or is there anything special that I'm missing?
>
This seems like a generally good idea: The DoFn is a user-facing API while
the DoFnInvoker is the worker/execution-facing API. So, aligning with this
makes sense. That said, things largely work already and Byron's case just
looks like a bug.

The situation isn't comparable to the "old" DoFn or how Flink works because
we are dynamically creating a class that is not present in the staged jars,
not just shipping a serialized instance. So you'd be shipping up a Class
object and loading it on the worker. This could be a fairly easy experiment
to try (we may have tried it in the old days and decided it wasn't the
easiest way).

Kenn



> On 1/7/26 02:17, Reuven Lax via dev wrote:
>
> How would we do this? Caching a map of DoFn -> Invoker is effectively
> doing it once per DoFn, no?
>
> On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis <[email protected]> wrote:
>
>> I was thinking more  in the sense of doing once per dofn rather than
>> using a globally memoized cache
>>
>> On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]> wrote:
>>
>>> Given how expensive it was in the past (multiple seconds to generate the
>>> bytecode!), I suspect we still don't want to do this on every
>>> element processing.
>>>
>>> On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis <[email protected]>
>>> wrote:
>>>
>>>> I suspect the cache also needs to include the cast target (or perhaps a
>>>> re-examination of "how expensive is this really in the year 2025?").
>>>> Fortunately there's a pretty easy workaround.
>>>>
>>>> On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles <[email protected]> wrote:
>>>>
>>>>> Generating the DoFnInvoker class takes enough time that it is
>>>>> important to memoize them. The cache is keyed on the DoFn class. See
>>>>> https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I’ve replicated this on the non-Portable FlinkRunner and
>>>>>> DirectRunner.
>>>>>>
>>>>>> On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> 
>>>>>> Which runner are you using?
>>>>>>
>>>>>> On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory)
>>>>>>> does generate casts to make sure that the elements match. I'm not 
>>>>>>> entirely
>>>>>>> sure offhand why the same DoFnInvoker is being used 0 seems like 
>>>>>>> something
>>>>>>> might be going wrong with DoFn caching.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I ran into sort of an interesting issue last night. Consider the
>>>>>>>> code below. If you try to run it what will happen is you'll get a
>>>>>>>> ClassCastException on the second Filter.by. What appears to be 
>>>>>>>> happening is
>>>>>>>> that the Filter.by DoFnInvoker is being reused... which should be fine
>>>>>>>> since that should be working with Object... but what I can't find is 
>>>>>>>> where
>>>>>>>> the casting is happening because it seems like a) the cast isn't 
>>>>>>>> actually
>>>>>>>> needed? and b) it's doing the wrong cast. Any clues?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> B
>>>>>>>>
>>>>>>>> @Testpublic void testReusedLambda() {
>>>>>>>>   p.apply(Create.of(new SimpleElement1()))
>>>>>>>>       .apply("First", Filter.by(Objects::nonNull))
>>>>>>>>       .apply(ParDo.of(new VerySimpleDoFn<>()))
>>>>>>>>       .apply("Second", Filter.by(Objects::nonNull));
>>>>>>>>   p.run().waitUntilFinish();
>>>>>>>> }
>>>>>>>> static class SimpleElement1 implements Serializable {}
>>>>>>>> static class SimpleElement2 implements Serializable {}
>>>>>>>> static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> {
>>>>>>>>   @ProcessElement  public void processElement(ProcessContext c) {
>>>>>>>>     c.output(new SimpleElement2());
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>> Byron Ellis ([email protected])
>>>> "Oook" -- The Librarian
>>>>
>>>
>>
>> --
>> Byron Ellis ([email protected])
>> "Oook" -- The Librarian
>>
>

Reply via email to