Dear,

I am using pyflink to develop a stream processing system. In one of the
processes I have processed a keyed_stream (which should be giving out a
data_stream according to my knowledge). Later I am trying to add a sink to
this data_stream.  While this is erroring out, the same sink applied to
another data stream works fine. I am not able to decode the error.

error while sinking:

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
        status = StatusCode.CANCELLED
        details = "Multiplexer hanging up"
        debug_error_string = "UNKNOWN:Error received from peer
ipv6:%5B::1%5D:34683 {created_time:"2024-08-09T18:20:28.159147552+05:30",
grpc_status:1, grpc_message:"Multiplexer hanging up"}"

the process function on keyed stream:
class process_load_data(KeyedProcessFunction):

def open(self,runtime_context):
self.state = runtime_context.get_map_state(MapStateDescriptor("my_state",
Types.INT(), Types.FLOAT()))

def process_element(self, value, ctx: ProcessFunction.Context):
data = json.loads(value)
previous_state = self.state.get(data[IO_ID_INDEX])
self.state.put(data[IO_ID_INDEX], data[VALUE_INDEX])
if not previous_state or ( previous_state and previous_state != data[value]
):
yield value

and the function call for sink and processing:
kafka_sink = FlinkKafkaProducer(
topic="load_data",
serialization_schema=SimpleStringSchema(),
producer_config=kafka_props
)

raw_data_stream = env.from_source(kafka_source, WatermarkStrategy.
for_monotonous_timestamps(),"Raw_Data")
load_data = raw_data_stream.key_by(key_selector()).process(process_load_data
())
load_data.add_sink(kafka_sink)

Reply via email to