I imported pyflink.common.types.Row and used it as Shuiqiang suggested but now Java throws a memory exception:
Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space} > ... 11 more > Caused by: java.lang.OutOfMemoryError: Java heap space > at > org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91) > at > org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127) > at > org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90) > at > org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) > at > org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown > Source) > Regards On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi meneldor, > > I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. > The signature of the `process_element` method has been changed in the new > version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out > your results. > > [1] https://issues.apache.org/jira/browse/FLINK-20647 > > Best, > Xingbo > > meneldor <menel...@gmail.com> 于2021年1月15日周五 上午1:20写道: > >> Thank you for the answer Shuiqiang! >> Im using the last apache-flink version: >> >>> Requirement already up-to-date: apache-flink in >>> ./venv/lib/python3.7/site-packages (1.12.0) >> >> however the method signature is using a collector: >> >> [image: image.png] >> Im using the *setup-pyflink-virtual-env.sh* shell script from the >> docs(which uses pip). >> >> Regards >> >> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com> >> wrote: >> >>> Hi meneldor, >>> >>> The main cause of the error is that there is a bug in >>> `ctx.timer_service().current_watermark()`. At the beginning the stream, >>> when the first record come into the KeyedProcessFunction.process_element() >>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at >>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807. >>> >>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500) >>> >>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be >>> automatically converted to a long interger in python but will cause Long >>> value overflow in Java when deserializing the registered timer value. I >>> will craete a issue to fix the bug. >>> >>> Let’s return to your initial question, at PyFlink you could create a Row >>> Type data as bellow: >>> >>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111) >>> >>> And I wonder which release version of flink the code snippet you >>> provided based on? The latest API for >>> KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() >>> will not provid a `collector` to collect output data but use `yield` which >>> is a more pythonic approach. >>> >>> Please refer to the following code: >>> >>> def keyed_process_function_example(): >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_parallelism(1) >>> env.get_config().set_auto_watermark_interval(2000) >>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>> data_stream = env.from_collection([(1, 'hello', '1603708211000'), >>> (2, 'hi', '1603708224000'), >>> (3, 'hello', '1603708226000'), >>> (4, 'hi', '1603708289000')], >>> type_info=Types.ROW([Types.INT(), >>> Types.STRING(), Types.STRING()])) >>> >>> class MyTimestampAssigner(TimestampAssigner): >>> >>> def extract_timestamp(self, value, record_timestamp) -> int: >>> return int(value[2]) >>> >>> class MyProcessFunction(KeyedProcessFunction): >>> >>> def process_element(self, value, ctx: >>> 'KeyedProcessFunction.Context'): >>> yield Row(id=ctx.get_current_key()[1], data='some_string', >>> timestamp=11111111) >>> # current_watermark = ctx.timer_service().current_watermark() >>> ctx.timer_service().register_event_time_timer(ctx.timestamp() + >>> 1500) >>> >>> def on_timer(self, timestamp: int, ctx: >>> 'KeyedProcessFunction.OnTimerContext'): >>> yield Row(id=ctx.get_current_key()[1], data='current on timer >>> timestamp: ' + str(timestamp), >>> timestamp=timestamp) >>> >>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], >>> [Types.STRING(), Types.STRING(), Types.INT()]) >>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ >>> .with_timestamp_assigner(MyTimestampAssigner()) >>> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ >>> .key_by(lambda x: (x[0], x[1]), >>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \ >>> .process(MyProcessFunction(), output_type=output_type_info).print() >>> env.execute('test keyed process function') >>> >>> >>> Best, >>> Shuiqiang >>> >>> >>> >>> >>> >>> meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道: >>> >>>> Hello, >>>> >>>> What is the correct way to use Python dict's as ROW type in pyflink? Im >>>> trying this: >>>> >>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], >>>> [Types.STRING(), Types.STRING(), >>>> Types.LONG() ]) >>>> >>>> class MyProcessFunction(KeyedProcessFunction): >>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', >>>> out: Collector): >>>> result = {"id": ctx.get_current_key()[0], "data": "some_string", >>>> "timestamp": 111111111111} >>>> out.collect(result) >>>> current_watermark = ctx.timer_service().current_watermark() >>>> ctx.timer_service().register_event_time_timer(current_watermark + >>>> 1500) >>>> >>>> def on_timer(self, timestamp, ctx: >>>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'): >>>> logging.info(timestamp) >>>> out.collect("On timer timestamp: " + str(timestamp)) >>>> >>>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), >>>> Types.STRING()])) \ >>>> .process(MyProcessFunction(), output_type=output_type_info) >>>> >>>> >>>> I just hardcoded the values in MyProcessFunction to be sure that the >>>> input data doesnt mess the fields. So the data is correct but PyFlink trews >>>> an exception: >>>> >>>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>>> at >>>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) >>>>> at >>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) >>>>> at >>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) >>>>> at >>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) >>>>> at >>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) >>>>> ... 10 more >>>> >>>> However it works with primitive types like Types.STRING(). According to >>>> the documentation the ROW type corresponds to the python's dict type. >>>> >>>> >>>> Regards >>>> >>>>