Dear, I am using pyflink to develop a stream processing system. In one of the processes I have processed a keyed_stream (which should be giving out a data_stream according to my knowledge). Later I am trying to add a sink to this data_stream. While this is erroring out, the same sink applied to another data stream works fine. I am not able to decode the error.
error while sinking: grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.CANCELLED details = "Multiplexer hanging up" debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:34683 {created_time:"2024-08-09T18:20:28.159147552+05:30", grpc_status:1, grpc_message:"Multiplexer hanging up"}" the process function on keyed stream: class process_load_data(KeyedProcessFunction): def open(self,runtime_context): self.state = runtime_context.get_map_state(MapStateDescriptor("my_state", Types.INT(), Types.FLOAT())) def process_element(self, value, ctx: ProcessFunction.Context): data = json.loads(value) previous_state = self.state.get(data[IO_ID_INDEX]) self.state.put(data[IO_ID_INDEX], data[VALUE_INDEX]) if not previous_state or ( previous_state and previous_state != data[value] ): yield value and the function call for sink and processing: kafka_sink = FlinkKafkaProducer( topic="load_data", serialization_schema=SimpleStringSchema(), producer_config=kafka_props ) raw_data_stream = env.from_source(kafka_source, WatermarkStrategy. for_monotonous_timestamps(),"Raw_Data") load_data = raw_data_stream.key_by(key_selector()).process(process_load_data ()) load_data.add_sink(kafka_sink)