Hi meneldor, Xingbo, Sorry for the late reply.
Thanks a lot for Xingbo’s clarification. And according to the stacktrace of the exception, could you have a check whether the result data match the specified return type? BTW, please share your code if it’s ok, it will be of help to debug. Best, Shuiqiang meneldor <menel...@gmail.com> 于2021年1月15日周五 下午4:59写道: > I imported pyflink.common.types.Row and used it as Shuiqiang suggested but > now Java throws a memory exception: > > Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space} >> ... 11 more >> Caused by: java.lang.OutOfMemoryError: Java heap space >> at >> org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91) >> at >> org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127) >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90) >> at >> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41) >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >> at >> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >> at >> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown >> Source) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown >> Source) >> > > Regards > > On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <hxbks...@gmail.com> wrote: > >> Hi meneldor, >> >> I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. >> The signature of the `process_element` method has been changed in the new >> version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out >> your results. >> >> [1] https://issues.apache.org/jira/browse/FLINK-20647 >> >> Best, >> Xingbo >> >> meneldor <menel...@gmail.com> 于2021年1月15日周五 上午1:20写道: >> >>> Thank you for the answer Shuiqiang! >>> Im using the last apache-flink version: >>> >>>> Requirement already up-to-date: apache-flink in >>>> ./venv/lib/python3.7/site-packages (1.12.0) >>> >>> however the method signature is using a collector: >>> >>> [image: image.png] >>> Im using the *setup-pyflink-virtual-env.sh* shell script from the >>> docs(which uses pip). >>> >>> Regards >>> >>> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com> >>> wrote: >>> >>>> Hi meneldor, >>>> >>>> The main cause of the error is that there is a bug in >>>> `ctx.timer_service().current_watermark()`. At the beginning the stream, >>>> when the first record come into the KeyedProcessFunction.process_element() >>>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at >>>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807. >>>> >>>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500) >>>> >>>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be >>>> automatically converted to a long interger in python but will cause Long >>>> value overflow in Java when deserializing the registered timer value. I >>>> will craete a issue to fix the bug. >>>> >>>> Let’s return to your initial question, at PyFlink you could create a >>>> Row Type data as bellow: >>>> >>>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111) >>>> >>>> And I wonder which release version of flink the code snippet you >>>> provided based on? The latest API for >>>> KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() >>>> will not provid a `collector` to collect output data but use `yield` which >>>> is a more pythonic approach. >>>> >>>> Please refer to the following code: >>>> >>>> def keyed_process_function_example(): >>>> env = StreamExecutionEnvironment.get_execution_environment() >>>> env.set_parallelism(1) >>>> env.get_config().set_auto_watermark_interval(2000) >>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>>> data_stream = env.from_collection([(1, 'hello', '1603708211000'), >>>> (2, 'hi', '1603708224000'), >>>> (3, 'hello', '1603708226000'), >>>> (4, 'hi', '1603708289000')], >>>> type_info=Types.ROW([Types.INT(), >>>> Types.STRING(), Types.STRING()])) >>>> >>>> class MyTimestampAssigner(TimestampAssigner): >>>> >>>> def extract_timestamp(self, value, record_timestamp) -> int: >>>> return int(value[2]) >>>> >>>> class MyProcessFunction(KeyedProcessFunction): >>>> >>>> def process_element(self, value, ctx: >>>> 'KeyedProcessFunction.Context'): >>>> yield Row(id=ctx.get_current_key()[1], data='some_string', >>>> timestamp=11111111) >>>> # current_watermark = ctx.timer_service().current_watermark() >>>> ctx.timer_service().register_event_time_timer(ctx.timestamp() >>>> + 1500) >>>> >>>> def on_timer(self, timestamp: int, ctx: >>>> 'KeyedProcessFunction.OnTimerContext'): >>>> yield Row(id=ctx.get_current_key()[1], data='current on timer >>>> timestamp: ' + str(timestamp), >>>> timestamp=timestamp) >>>> >>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], >>>> [Types.STRING(), Types.STRING(), Types.INT()]) >>>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ >>>> .with_timestamp_assigner(MyTimestampAssigner()) >>>> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ >>>> .key_by(lambda x: (x[0], x[1]), >>>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \ >>>> .process(MyProcessFunction(), output_type=output_type_info).print() >>>> env.execute('test keyed process function') >>>> >>>> >>>> Best, >>>> Shuiqiang >>>> >>>> >>>> >>>> >>>> >>>> meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道: >>>> >>>>> Hello, >>>>> >>>>> What is the correct way to use Python dict's as ROW type in pyflink? >>>>> Im trying this: >>>>> >>>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], >>>>> [Types.STRING(), Types.STRING(), >>>>> Types.LONG() ]) >>>>> >>>>> class MyProcessFunction(KeyedProcessFunction): >>>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', >>>>> out: Collector): >>>>> result = {"id": ctx.get_current_key()[0], "data": "some_string", >>>>> "timestamp": 111111111111} >>>>> out.collect(result) >>>>> current_watermark = ctx.timer_service().current_watermark() >>>>> ctx.timer_service().register_event_time_timer(current_watermark + >>>>> 1500) >>>>> >>>>> def on_timer(self, timestamp, ctx: >>>>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'): >>>>> logging.info(timestamp) >>>>> out.collect("On timer timestamp: " + str(timestamp)) >>>>> >>>>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), >>>>> Types.STRING()])) \ >>>>> .process(MyProcessFunction(), output_type=output_type_info) >>>>> >>>>> >>>>> I just hardcoded the values in MyProcessFunction to be sure that the >>>>> input data doesnt mess the fields. So the data is correct but PyFlink >>>>> trews >>>>> an exception: >>>>> >>>>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) >>>>>> ... 10 more >>>>> >>>>> However it works with primitive types like Types.STRING(). According to >>>>> the documentation the ROW type corresponds to the python's dict type. >>>>> >>>>> >>>>> Regards >>>>> >>>>>