Our best bet is probably to add the class being cast to as part of the cache key. Presumably you know that as the time of caching because you were able to generate that bytecode. Though… given that this is Java and runtime do we even need that cast?
On 1/9/26 16:46, Kenneth Knowles wrote:
Is it possible to generate DoFnInvoker only during
pipeline construction time? Flink generally uses
approach that all user code is Serializable, the
serialized bytes are distributed to workers and
instances of UDFs are created only by deserialization.
This would require DoFnInvoker(s) (and all related
classes, like OnTimerInvoker) to be Serializable, but
then this should work or is there anything special that
I'm missing?
This seems like a generally good idea: The DoFn is a
user-facing API while the DoFnInvoker is the
worker/execution-facing API. So, aligning with this makes
sense. That said, things largely work already and Byron's
case just looks like a bug.
Agree.
The situation isn't comparable to the "old" DoFn or how
Flink works because we are dynamically creating a class that
is not present in the staged jars, not just shipping a
serialized instance. So you'd be shipping up a Class object
and loading it on the worker. This could be a fairly easy
experiment to try (we may have tried it in the old days and
decided it wasn't the easiest way).
Yes, it will probably require assembling all the DoFnInvokers to
a separate jar and skipping it with the user code. I use this
approach to run pipelines from a groovy REPL shell, seems to work
fine.
Jan
Kenn
On 1/7/26 02:17, Reuven Lax via dev wrote:
How would we do this? Caching a map of
DoFn -> Invoker is effectively doing it once per
DoFn, no?
I was thinking more in the sense of
doing once per dofn rather than using a globally
memoized cache
Given how expensive it was in the
past (multiple seconds to generate the
bytecode!), I suspect we still don't want to
do this on every element processing.
I suspect the cache also
needs to include the cast target (or
perhaps a re-examination of "how expensive
is this really in the year 2025?").
Fortunately there's a pretty easy
workaround.
I’ve replicated
this on the non-Portable
FlinkRunner and DirectRunner.
Which runner
are you using?
The
bytecode generated in
DoFnInvoker
(ByteBuddyDoFnInvokerFactory)
does generate casts to
make sure that the
elements match. I'm not
entirely sure offhand
why the same DoFnInvoker
is being used 0 seems
like something might be
going wrong with DoFn
caching.
Reuven
Hi all,
I ran into sort
of an interesting
issue last night.
Consider the code
below. If you try
to run it what
will happen is
you'll get a
ClassCastException
on the second
Filter.by. What
appears to be
happening is that
the Filter.by
DoFnInvoker is
being reused...
which should be
fine since that
should be working
with Object... but
what I can't find
is where the
casting is
happening because
it seems like a)
the cast isn't
actually needed?
and b) it's doing
the wrong cast.
Any clues?
Best,
B
@Test
public void testReusedLambda() {
p.apply(Create.of(new SimpleElement1()))
.apply("First", Filter.by(Objects::nonNull))
.apply(ParDo.of(new VerySimpleDoFn<>()))
.apply("Second", Filter.by(Objects::nonNull));
p.run().waitUntilFinish();
}
static class SimpleElement1 implements Serializable {}
static class SimpleElement2 implements Serializable {}
static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new SimpleElement2());
}
}
--
--
|