Re: ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-14 Thread Chesnay Schepler
As seen in the stacktrace every sink added via StreamExEnv#add_source is wrapped in a PythonSinkFunction which internally converts things to PyObjects, that's why the mapper had no effect. Currently we don't differentiate between java/python sinks, contrary to sources where we have an explicit

Re: ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-14 Thread vino yang
Hi Joe, ping Chesnay for you, please wait for the reply. Thanks, vino. Joe Malt 于2018年8月15日周三 上午7:16写道: > Hi, > > I'm trying to write to a Kafka stream in a Flink job using the new Python > streaming API. > > My program looks like this: > > def main(factory): > > props = Properties() >

ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-14 Thread Joe Malt
Hi, I'm trying to write to a Kafka stream in a Flink job using the new Python streaming API. My program looks like this: def main(factory): props = Properties() props.setProperty("bootstrap.servers",configs['kafkaBroker']) consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']