As seen in the stacktrace every sink added via StreamExEnv#add_source is wrapped in a PythonSinkFunction which internally converts things to PyObjects, that's why the mapper had no effect. Currently we don't differentiate between java/python sinks, contrary to sources where we have an explicit StreamExEnv#add_java_source method.

There are 2 ways to approach this issue:
* As alluded in a previous mail, create a python wrapper around the kafka consumer class.
* extend PythonDataStream class with a separate method for kafka.

Unfortunately I don't think we can solve this in a generic matter (i.e. add_java_source) since the java types wouldn't fit at compile time.

On 15.08.2018 04:15, vino yang wrote:
Hi Joe,

ping Chesnay for you, please wait for the reply.

Thanks, vino.

Joe Malt <jm...@yelp.com <mailto:jm...@yelp.com>> 于2018年8月15日周三 上午7:16写道:

    Hi,

    I'm trying to write to a Kafka stream in a Flink job using the new
    Python streaming API.

    My program looks like this:

    def main(factory):

         props = Properties()
         props.setProperty("bootstrap.servers",configs['kafkaBroker'])

         consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']], 
SimpleStringSchema(), props)
         producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'], 
SimpleStringSchema(), props)

         env = factory.get_execution_environment()

         stream = env.add_java_source(consumer)

         stream.output()# this works (prints to a .out file) 
stream.add_sink(producer)# producing to this causes the exception env.execute()

    I'm getting a ClassCastException when trying to output to the
    FlinkKafkaProducer:

    java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to 
java.lang.String
        at 
org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
        at 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
        at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
        at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)


    It seems that the Python string isn't getting converted to a
    java.lang.String, which should happen automatically in Jython.

    I've tried adding a MapFunction that maps each input to
    String(input)where String is the constructor for java.lang.String.
    This made no difference; I get the same error.

    Any ideas?

    Thanks,

    Joe Malt

    Software Engineering Intern
    Yelp


Reply via email to