I can't really help you here.
Digging into the backing java internals isn't supported, and neither is
registering a kryo serializer (which is why it isn't exposed in the
python environment).
The jython-related serialization logic doesn't care about Flink's usual
type serialization mechanism, so using avro will simply not work. It
entirely assumes that all data is either created on the python or can be
mapped automatically to a python type by jython.
On 18.09.2018 20:05, Joe Malt wrote:
Bumping this (I hope that's OK!) - I've been trying to fix this for a
week and got nowhere
On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas
<k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
Hi Joe,
Probably Chesnay (cc’ed) may have a better idea on why this is
happening.
Cheers,
Kostas
On Sep 14, 2018, at 7:30 PM, Joe Malt <jm...@yelp.com
<mailto:jm...@yelp.com>> wrote:
Hi,
I'm trying to write a Flink job (with the Python streaming API)
that handles a custom type that needs a custom Kryo serializer.
When we implemented a similar job in Scala we used
addDefaultKryoSerializer, similar to the instructions in
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html>
In Python, the PythonStreamExecutionEnvironmentdoesn't have this
method, but it does have an ordinary StreamExecutionEnvironmentas
a private field ("env"). I'm using reflection to get this
StreamExecutionEnvironment and calling addDefaultKryoSerializeron
it, but that doesn't seem to work:
f = python_execution_env.getClass().getDeclaredField("env")
f.setAccessible(True)
java_execution_env = f.get(python_execution_env)
java_execution_env.addDefaultKryoSerializer(Message,
MessageKryoSerializer)
With or without these lines, the job crashes with a KryoException
(full stack trace at https://pastebin.com/zxxzCqH0
<https://pastebin.com/zxxzCqH0>), it doesn't appear that
addDefaultKryoSerializer is doing anything.
Is there an officially supported way to set custom serializers in
Python?
Thanks,
Joe Malt
Engineering Intern, Stream Processing
Yelp