Is it possible to generate DoFnInvoker only during pipeline construction
time? Flink generally uses approach that all user code is Serializable,
the serialized bytes are distributed to workers and instances of UDFs
are created only by deserialization. This would require DoFnInvoker(s)
(and all related classes, like OnTimerInvoker) to be Serializable, but
then this should work or is there anything special that I'm missing?
On 1/7/26 02:17, Reuven Lax via dev wrote:
How would we do this? Caching a map of DoFn -> Invoker is effectively
doing it once per DoFn, no?
On Tue, Jan 6, 2026 at 4:57 PM Byron Ellis <[email protected]> wrote:
I was thinking more in the sense of doing once per dofn rather
than using a globally memoized cache
On Tue, Jan 6, 2026 at 3:42 PM Reuven Lax <[email protected]> wrote:
Given how expensive it was in the past (multiple seconds to
generate the bytecode!), I suspect we still don't want to do
this on every element processing.
On Tue, Jan 6, 2026 at 8:41 AM Byron Ellis
<[email protected]> wrote:
I suspect the cache also needs to include the cast target
(or perhaps a re-examination of "how expensive is this
really in the year 2025?"). Fortunately there's a pretty
easy workaround.
On Sun, Jan 4, 2026 at 5:27 PM Kenneth Knowles
<[email protected]> wrote:
Generating the DoFnInvoker class takes enough time
that it is important to memoize them. The cache is
keyed on the DoFn class. See
https://github.com/apache/beam/blob/a3af5d54e257fc5da8e923916d8956ef1f31f1b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L305
Kenn
On Thu, Dec 18, 2025 at 5:33 PM Byron Ellis
<[email protected]> wrote:
I’ve replicated this on the non-Portable
FlinkRunner and DirectRunner.
On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev
<[email protected]> wrote:
Which runner are you using?
On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax
<[email protected]> wrote:
The bytecode generated in DoFnInvoker
(ByteBuddyDoFnInvokerFactory) does generate
casts to make sure that the elements match.
I'm not entirely sure offhand why the same
DoFnInvoker is being used 0 seems like
something might be going wrong with DoFn
caching.
Reuven
On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis
<[email protected]> wrote:
Hi all,
I ran into sort of an interesting issue
last night. Consider the code below. If
you try to run it what will happen is
you'll get a ClassCastException on the
second Filter.by. What appears to be
happening is that the Filter.by
DoFnInvoker is being reused... which
should be fine since that should be
working with Object... but what I can't
find is where the casting is happening
because it seems like a) the cast isn't
actually needed? and b) it's doing the
wrong cast. Any clues?
Best,
B
@Test public void testReusedLambda() {
p.apply(Create.of(new SimpleElement1()))
.apply("First",Filter.by(Objects::nonNull))
.apply(ParDo.of(new VerySimpleDoFn<>()))
.apply("Second",Filter.by(Objects::nonNull));
p.run().waitUntilFinish();
}
static class SimpleElement1 implements
Serializable {}
static class SimpleElement2 implements
Serializable {}
static class VerySimpleDoFn<I> extends
DoFn<I,SimpleElement2> {
@ProcessElement public void
processElement(ProcessContext c) {
c.output(new SimpleElement2());
}
}
--
Byron Ellis ([email protected])
"Oook" -- The Librarian
--
Byron Ellis ([email protected])
"Oook" -- The Librarian