I can't really help you here.

Digging into the backing java internals isn't supported, and neither is registering a kryo serializer (which is why it isn't exposed in the python environment). The jython-related serialization logic doesn't care about Flink's usual type serialization mechanism, so using avro will simply not work. It entirely assumes that all data is either created on the python or can be mapped automatically to a python type by jython.

On 18.09.2018 20:05, Joe Malt wrote:
Bumping this (I hope that's OK!) - I've been trying to fix this for a week and got nowhere

On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:

    Hi Joe,

    Probably Chesnay (cc’ed) may have a better idea on why this is
    happening.

    Cheers,
    Kostas

    On Sep 14, 2018, at 7:30 PM, Joe Malt <jm...@yelp.com
    <mailto:jm...@yelp.com>> wrote:

    Hi,

    I'm trying to write a Flink job (with the Python streaming API)
    that handles a custom type that needs a custom Kryo serializer.

    When we implemented a similar job in Scala we used
    addDefaultKryoSerializer, similar to the instructions in
    
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html
    
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html>

    In Python, the PythonStreamExecutionEnvironmentdoesn't have this
    method, but it does have an ordinary StreamExecutionEnvironmentas
    a private field ("env"). I'm using reflection to get this
    StreamExecutionEnvironment and calling addDefaultKryoSerializeron
    it, but that doesn't seem to work:

    f = python_execution_env.getClass().getDeclaredField("env")
    f.setAccessible(True)
    java_execution_env = f.get(python_execution_env)
    java_execution_env.addDefaultKryoSerializer(Message,
    MessageKryoSerializer)

    With or without these lines, the job crashes with a KryoException
    (full stack trace at https://pastebin.com/zxxzCqH0
    <https://pastebin.com/zxxzCqH0>), it doesn't appear that
    addDefaultKryoSerializer is doing anything.

    Is there an officially supported way to set custom serializers in
    Python?

    Thanks,

    Joe Malt
    Engineering Intern, Stream Processing
    Yelp





Reply via email to