Bumping this (I hope that's OK!) - I've been trying to fix this for a week and got nowhere
On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas <k.klou...@data-artisans.com > wrote: > Hi Joe, > > Probably Chesnay (cc’ed) may have a better idea on why this is happening. > > Cheers, > Kostas > > On Sep 14, 2018, at 7:30 PM, Joe Malt <jm...@yelp.com> wrote: > > Hi, > > I'm trying to write a Flink job (with the Python streaming API) that > handles a custom type that needs a custom Kryo serializer. > > When we implemented a similar job in Scala we used > addDefaultKryoSerializer, similar to the instructions in > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_ > serializers.html > > In Python, the PythonStreamExecutionEnvironment doesn't have this method, > but it does have an ordinary StreamExecutionEnvironment as a private > field ("env"). I'm using reflection to get this StreamExecutionEnvironment > and calling addDefaultKryoSerializer on it, but that doesn't seem to work: > > f = python_execution_env.getClass().getDeclaredField("env") > f.setAccessible(True) > java_execution_env = f.get(python_execution_env) > java_execution_env.addDefaultKryoSerializer(Message, > MessageKryoSerializer) > > With or without these lines, the job crashes with a KryoException (full > stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that > addDefaultKryoSerializer is doing anything. > > Is there an officially supported way to set custom serializers in Python? > > Thanks, > > Joe Malt > Engineering Intern, Stream Processing > Yelp > > > >