Dear Team,
I am new to pyflink and request for your support in issue I am facing with Pyflink. I am using Pyflink version 1.14.4 & using reference code from pyflink github. I am getting following error . grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.CANCELLED details = "Multiplexer hanging up" debug_error_string = "{"created":"@1651051695.104000000","description":"Error received from peer ipv6:[::1]:64839","file":"src/core/lib/surface/call.cc","file_line":904,"grp c_message":"Multiplexer hanging up","grpc_status":1}" py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute. Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy Caused by: java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException : Could not forward element to next operator Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException : Could not forward element to next operator Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.flink.types.Row Below is my code for reference.. import json import logging import os import sys from pyflink.common import Types, JsonRowSerializationSchema, Row, CsvRowSerializationSchema from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import FlinkKafkaProducer import math def show(ds, env): ds.print() env.execute() def basic_operations(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flink-sql-connector-kafka_2.11-1.14.4.jar') env.add_jars("file:///{}".format(kafka_jar)) ds = env.from_collection( collection=[ ('user1', 1, 2000), ('user2', 2, 4000), ('user3', 3, 1000), ('user1', 4, 25000), ('user2', 5, 7000), ('user3', 8, 7000), ('user1', 12, 2000), ('user2', 13, 4000), ('user3', 15, 1000), ('user1', 17, 20000), ('user2', 18, 40000), ('user3', 20, 10000), ('user1', 21, 2000), ('user2', 22, 4000), ('user3', 33, 1000), ('user1', 34, 25000), ('user2', 35, 7000), ('user3', 38, 7000) ], type_info=Types.ROW_NAMED(["id", "info", "value"], [Types.STRING(), Types.INT(), Types.INT()]) ) ds1 = ds.map(lambda x: x) ds1.print() def update_tel(data): # parse the json test_data = data.info test_data += data.value res = Row('x', 'y') #return Types.ROW(data.id, test_data) return res(data.id, test_data) # show(ds.map(update_tel).key_by(lambda data: data[0]), env) ds = ds.map(update_tel) ds.print() # ds = ds.map(lambda x: type(x)) # ds.print() # ds = ds.map(lambda x: Row([x]), output_type=Types.ROW([Types.STRING(), Types.INT()])) # ds.print() type_info = Types.ROW_NAMED(['x', 'y'], [Types.STRING(), Types.INT()]) serialization_schema = CsvRowSerializationSchema.Builder(type_info).build() kafka_producer = FlinkKafkaProducer( topic='testing', serialization_schema=serialization_schema, producer_config={'bootstrap.servers': 'localhost:9093', 'group.id': 'test_group'} ) ds.add_sink(kafka_producer) env.execute('basic_operations') if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") basic_operations()