Dear Team,
I am new to pyflink and request for your support in issue I am facing with Pyflink. I am using Pyflink version 1.14.4 & using reference code from pyflink getting started pages. I am getting following error . py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. Caused by: TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOp eratorException: Could not forward element to next operator} Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException : Could not forward element to next operator Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String Below is my code for reference.. import os from pyflink.common import SimpleStringSchema from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer from pyflink.common import Types def main(): env = StreamExecutionEnvironment.get_execution_environment() # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.set_parallelism(1) kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flink-sql-connector-kafka_2.11-1.14.4.jar') env.add_jars("file:///{}".format(kafka_jar)) deserialization_schema = SimpleStringSchema() # Test for kafka consumer kafka_consumer = FlinkKafkaConsumer( topics='test', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': 'localhost:9093'}) ds = env.add_source(kafka_consumer) #DATA USED IN KAFKA IS LIKE ('user1', 1, 2000) ds = ds.map(lambda x: eval(x)) ds = ds.map(lambda x: ','.join([str(value) for value in x])) #ds.print() kafka_producer = FlinkKafkaProducer( topic='testresult', serialization_schema=SimpleStringSchema(), producer_config={'bootstrap.servers': 'localhost:9093', 'group.id': 'fraud_test'}) ds.add_sink(kafka_producer) env.execute('main') if __name__ == '__main__': main() Thanks and Regards, Harshit