Our best bet is probably to add the class being cast to as part of the cache key. Presumably you know that as the time of caching because you were able to generate that bytecode. Though… given that this is Java and runtime do we even need that cast?

On Jan 9, 2026, at 9:48 AM, Jan Lukavský <[email protected]> wrote:


On 1/9/26 16:46, Kenneth Knowles wrote:
On Fri, Jan 9, 2026 at 4:07 AM Jan Lukavský <[email protected]> wrote:

Is it possible to generate DoFnInvoker only during pipeline construction time? Flink generally uses approach that all user code is Serializable, the serialized bytes are distributed to workers and instances of UDFs are created only by deserialization. This would require DoFnInvoker(s) (and all related classes, like OnTimerInvoker) to be Serializable, but then this should work or is there anything special that I'm missing?

This seems like a generally good idea: The DoFn is a user-facing API while the DoFnInvoker is the worker/execution-facing API. So, aligning with this makes sense. That said, things largely work already and Byron's case just looks like a bug.
Agree.

The situation isn't comparable to the "old" DoFn or how Flink works because we are dynamically creating a class that is not present in the staged jars, not just shipping a serialized instance. So you'd be shipping up a Class object and loading it on the worker. This could be a fairly easy experiment to try (we may have tried it in the old days and decided it wasn't the easiest way).

Yes, it will probably require assembling all the DoFnInvokers to a separate jar and skipping it with the user code. I use this approach to run pipelines from a groovy REPL shell, seems to work fine.

 Jan


Kenn

 
On 1/7/26 02:17, Reuven Lax via dev wrote:
How would we do this? Caching a map of DoFn -> Invoker is effectively doing it once per DoFn, no?

On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis <[email protected]> wrote:
I was thinking more  in the sense of doing once per dofn rather than using a globally memoized cache

On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]> wrote:
Given how expensive it was in the past (multiple seconds to generate the bytecode!), I suspect we still don't want to do this on every element processing.

On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis <[email protected]> wrote:
I suspect the cache also needs to include the cast target (or perhaps a re-examination of "how expensive is this really in the year 2025?"). Fortunately there's a pretty easy workaround.

On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles <[email protected]> wrote:
Generating the DoFnInvoker class takes enough time that it is important to memoize them. The cache is keyed on the DoFn class. See https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305

Kenn

On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis <[email protected]> wrote:
I’ve replicated this on the non-Portable FlinkRunner and DirectRunner. 

On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]> wrote:


Which runner are you using?

On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote:
The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) does generate casts to make sure that the elements match. I'm not entirely sure offhand why the same DoFnInvoker is being used 0 seems like something might be going wrong with DoFn caching.

Reuven

On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote:
Hi all,

I ran into sort of an interesting issue last night. Consider the code below. If you try to run it what will happen is you'll get a ClassCastException on the second Filter.by. What appears to be happening is that the Filter.by DoFnInvoker is being reused... which should be fine since that should be working with Object... but what I can't find is where the casting is happening because it seems like a) the cast isn't actually needed? and b) it's doing the wrong cast. Any clues?

Best,
B

@Test
public void testReusedLambda() {
  p.apply(Create.of(new SimpleElement1()))
      .apply("First", Filter.by(Objects::nonNull))
      .apply(ParDo.of(new VerySimpleDoFn<>()))
      .apply("Second", Filter.by(Objects::nonNull));
  p.run().waitUntilFinish();
}

static class SimpleElement1 implements Serializable {}

static class SimpleElement2 implements Serializable {}

static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(new SimpleElement2());
  }
}


--
Byron Ellis ([email protected])
"Oook" -- The Librarian


--
Byron Ellis ([email protected])
"Oook" -- The Librarian

Reply via email to