Hello, What is the correct way to use Python dict's as ROW type in pyflink? Im trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], [Types.STRING(), Types.STRING(), Types.LONG() ]) class MyProcessFunction(KeyedProcessFunction): def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector): result = {"id": ctx.get_current_key()[0], "data": "some_string", "timestamp": 111111111111} out.collect(result) current_watermark = ctx.timer_service().current_watermark() ctx.timer_service().register_event_time_timer(current_watermark + 1500) def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'): logging.info(timestamp) out.collect("On timer timestamp: " + str(timestamp)) ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \ .process(MyProcessFunction(), output_type=output_type_info) I just hardcoded the values in MyProcessFunction to be sure that the input data doesnt mess the fields. So the data is correct but PyFlink trews an exception: at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) > at > org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) > at > org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) > ... 10 more However it works with primitive types like Types.STRING(). According to the documentation the ROW type corresponds to the python's dict type. Regards