Bumping this (I hope that's OK!) - I've been trying to fix this for a week
and got nowhere

On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Joe,
>
> Probably Chesnay (cc’ed) may have a better idea on why this is happening.
>
> Cheers,
> Kostas
>
> On Sep 14, 2018, at 7:30 PM, Joe Malt <jm...@yelp.com> wrote:
>
> Hi,
>
> I'm trying to write a Flink job (with the Python streaming API) that
> handles a custom type that needs a custom Kryo serializer.
>
> When we implemented a similar job in Scala we used
> addDefaultKryoSerializer, similar to the instructions in
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_
> serializers.html
>
> In Python, the PythonStreamExecutionEnvironment doesn't have this method,
> but it does have an ordinary StreamExecutionEnvironment as a private
> field ("env"). I'm using reflection to get this StreamExecutionEnvironment
> and calling addDefaultKryoSerializer on it, but that doesn't seem to work:
>
> f = python_execution_env.getClass().getDeclaredField("env")
> f.setAccessible(True)
> java_execution_env = f.get(python_execution_env)
> java_execution_env.addDefaultKryoSerializer(Message,
> MessageKryoSerializer)
>
> With or without these lines, the job crashes with a KryoException (full
> stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that
> addDefaultKryoSerializer is doing anything.
>
> Is there an officially supported way to set custom serializers in Python?
>
> Thanks,
>
> Joe Malt
> Engineering Intern, Stream Processing
> Yelp
>
>
>
>

Reply via email to