Hi meneldor, I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. The signature of the `process_element` method has been changed in the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out your results.
[1] https://issues.apache.org/jira/browse/FLINK-20647 Best, Xingbo meneldor <menel...@gmail.com> 于2021年1月15日周五 上午1:20写道: > Thank you for the answer Shuiqiang! > Im using the last apache-flink version: > >> Requirement already up-to-date: apache-flink in >> ./venv/lib/python3.7/site-packages (1.12.0) > > however the method signature is using a collector: > > [image: image.png] > Im using the *setup-pyflink-virtual-env.sh* shell script from the > docs(which uses pip). > > Regards > > On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com> > wrote: > >> Hi meneldor, >> >> The main cause of the error is that there is a bug in >> `ctx.timer_service().current_watermark()`. At the beginning the stream, >> when the first record come into the KeyedProcessFunction.process_element() >> , the current_watermark will be the Long.MIN_VALUE at Java side, while at >> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807. >> >> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500) >> >> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be >> automatically converted to a long interger in python but will cause Long >> value overflow in Java when deserializing the registered timer value. I >> will craete a issue to fix the bug. >> >> Let’s return to your initial question, at PyFlink you could create a Row >> Type data as bellow: >> >> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111) >> >> And I wonder which release version of flink the code snippet you provided >> based on? The latest API for KeyedProcessFunction.process_element() and >> KeyedProcessFunction.on_timer() will not provid a `collector` to collect >> output data but use `yield` which is a more pythonic approach. >> >> Please refer to the following code: >> >> def keyed_process_function_example(): >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> env.get_config().set_auto_watermark_interval(2000) >> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> data_stream = env.from_collection([(1, 'hello', '1603708211000'), >> (2, 'hi', '1603708224000'), >> (3, 'hello', '1603708226000'), >> (4, 'hi', '1603708289000')], >> type_info=Types.ROW([Types.INT(), >> Types.STRING(), Types.STRING()])) >> >> class MyTimestampAssigner(TimestampAssigner): >> >> def extract_timestamp(self, value, record_timestamp) -> int: >> return int(value[2]) >> >> class MyProcessFunction(KeyedProcessFunction): >> >> def process_element(self, value, ctx: >> 'KeyedProcessFunction.Context'): >> yield Row(id=ctx.get_current_key()[1], data='some_string', >> timestamp=11111111) >> # current_watermark = ctx.timer_service().current_watermark() >> ctx.timer_service().register_event_time_timer(ctx.timestamp() + >> 1500) >> >> def on_timer(self, timestamp: int, ctx: >> 'KeyedProcessFunction.OnTimerContext'): >> yield Row(id=ctx.get_current_key()[1], data='current on timer >> timestamp: ' + str(timestamp), >> timestamp=timestamp) >> >> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], >> [Types.STRING(), Types.STRING(), Types.INT()]) >> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ >> .with_timestamp_assigner(MyTimestampAssigner()) >> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ >> .key_by(lambda x: (x[0], x[1]), >> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \ >> .process(MyProcessFunction(), output_type=output_type_info).print() >> env.execute('test keyed process function') >> >> >> Best, >> Shuiqiang >> >> >> >> >> >> meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道: >> >>> Hello, >>> >>> What is the correct way to use Python dict's as ROW type in pyflink? Im >>> trying this: >>> >>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], >>> [Types.STRING(), Types.STRING(), >>> Types.LONG() ]) >>> >>> class MyProcessFunction(KeyedProcessFunction): >>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', >>> out: Collector): >>> result = {"id": ctx.get_current_key()[0], "data": "some_string", >>> "timestamp": 111111111111} >>> out.collect(result) >>> current_watermark = ctx.timer_service().current_watermark() >>> ctx.timer_service().register_event_time_timer(current_watermark + >>> 1500) >>> >>> def on_timer(self, timestamp, ctx: >>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'): >>> logging.info(timestamp) >>> out.collect("On timer timestamp: " + str(timestamp)) >>> >>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), >>> Types.STRING()])) \ >>> .process(MyProcessFunction(), output_type=output_type_info) >>> >>> >>> I just hardcoded the values in MyProcessFunction to be sure that the >>> input data doesnt mess the fields. So the data is correct but PyFlink trews >>> an exception: >>> >>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>> at >>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) >>>> at >>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) >>>> at >>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) >>>> at >>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) >>>> at >>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) >>>> ... 10 more >>> >>> However it works with primitive types like Types.STRING(). According to the >>> documentation the ROW type corresponds to the python's dict type. >>> >>> >>> Regards >>> >>>