Hi,

I'm trying to write a pipeline using the new Python streaming API, which
reads from Kafka using FlinkKafkaConsumer010.

This works fine when using an existing deserializer like the SimpleStringSchema
but I need to define my own deserializer to process a custom format. I've
written a class which extends SimpleStringSchema, but I get an ImportError
when trying to use it.

The class is as follows:

from org.apache.flink.api.common.serialization import SimpleStringSchema

class MyCustomKafkaDeserializer(SimpleStringSchema):

    def __init__(self):
        SimpleStringSchema.__init__(self)
        print "created MyKafkaDeserializer"

    def deserialize(self, *args):
        *# snip*

I instantiate the Kafka consumer like this:

consumer = FlinkKafkaConsumer010([configs['kafkaTopic']],
MyCustomKafkaDeserializer(), props)


When I start the pipeline I see the message printed in the constructor (so
the deserializer is being created) but once env.execute() is called I get
this error:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: ImportError: No module named MyCustomKafkaDeserializer

        at org.python.core.Py.ImportError(Py.java:328)


The issue is the same whether MyCustomKafkaDeserializer is defined in
the same file as the pipeline, or in another file and imported. It
seems that the internals of Flink can't find the class for some
reason.


The command I'm using to run the pipeline:

./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py
/Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local

How can I make Flink see the custom deserializer?

Thanks,

Joe Malt

Software Engineering Intern, Stream Processing
Yelp

Reply via email to