Hi,

I'm trying to write a Flink job (with the Python streaming API) that
handles a custom type that needs a custom Kryo serializer.

When we implemented a similar job in Scala we used addDefaultKryoSerializer,
similar to the instructions in
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html

In Python, the PythonStreamExecutionEnvironment doesn't have this method,
but it does have an ordinary StreamExecutionEnvironment as a private field
("env"). I'm using reflection to get this StreamExecutionEnvironment and
calling addDefaultKryoSerializer on it, but that doesn't seem to work:

f = python_execution_env.getClass().getDeclaredField("env")
f.setAccessible(True)
java_execution_env = f.get(python_execution_env)
java_execution_env.addDefaultKryoSerializer(Message, MessageKryoSerializer)

With or without these lines, the job crashes with a KryoException (full
stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that
addDefaultKryoSerializer is doing anything.

Is there an officially supported way to set custom serializers in Python?

Thanks,

Joe Malt
Engineering Intern, Stream Processing
Yelp

Reply via email to