Hi Joe,

Probably Chesnay (cc’ed) may have a better idea on why this is happening.

Cheers,
Kostas

> On Sep 14, 2018, at 7:30 PM, Joe Malt <jm...@yelp.com> wrote:
> 
> Hi,
> 
> I'm trying to write a Flink job (with the Python streaming API) that handles 
> a custom type that needs a custom Kryo serializer.
> 
> When we implemented a similar job in Scala we used addDefaultKryoSerializer, 
> similar to the instructions in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html>
> 
> In Python, the PythonStreamExecutionEnvironment doesn't have this method, but 
> it does have an ordinary StreamExecutionEnvironment as a private field 
> ("env"). I'm using reflection to get this StreamExecutionEnvironment and 
> calling addDefaultKryoSerializer on it, but that doesn't seem to work:
> 
> f = python_execution_env.getClass().getDeclaredField("env")
> f.setAccessible(True)
> java_execution_env = f.get(python_execution_env)
> java_execution_env.addDefaultKryoSerializer(Message, MessageKryoSerializer)
> 
> With or without these lines, the job crashes with a KryoException (full stack 
> trace at https://pastebin.com/zxxzCqH0 <https://pastebin.com/zxxzCqH0>), it 
> doesn't appear that addDefaultKryoSerializer is doing anything.
> 
> Is there an officially supported way to set custom serializers in Python? 
> 
> Thanks,
> 
> Joe Malt
> Engineering Intern, Stream Processing
> Yelp
> 
> 

Reply via email to