Hi, here is the code. This is a JSON data from Maxwell CDC: env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.get_config().set_auto_watermark_interval(2000) env.set_parallelism(1) device_type_info = Types.ROW_NAMED(['commit', 'ts', 'type', 'data', 'old'], [Types.BOOLEAN(), Types.LONG(), Types.STRING(), Types.ROW_NAMED(['id', 'tp', 'device_ts', 'account'], [Types.STRING(),Types.STRING(),Types.LONG(),Types.STRING()]), Types.ROW_NAMED(['id', 'tp', 'device_ts', 'account'], [Types.STRING(),Types.STRING(),Types.LONG(),Types.STRING()])]) output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts' ], [Types.STRING(), Types.STRING(), Types.INT(), Types.STRING(), Types.LONG() ])
device_row_schema = JsonRowDeserializationSchema.builder().type_info(device_type_info).build() class KafkaRowTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value: Any, record_timestamp: int): return int(value[3][2]) class MyKeySelector(KeySelector): def get_key(self, value): return (str(value[3][0]), str(value[3][1])) class MyProcessFunction(KeyedProcessFunction): def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2]) ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500) def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'): yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, timestamp=timestamp) device_consumer = FlinkKafkaConsumer("device", device_row_schema, {'bootstrap.servers': 'localhost:9092'}) device_consumer.set_start_from_earliest() watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(KafkaRowTimestampAssigner()) device_ds = env.add_source(device_consumer) device_ds.assign_timestamps_and_watermarks(watermark_strategy).key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \ .process(MyProcessFunction(), output_type=output_type_info) job_client = env.execute_async('Device enrichment Job') job_client.get_job_execution_result().result() This is the input data: {"commit": true, "ts": 1610546861, "type": "update", "data": {"id": "id2", "tp": "B", "device_ts": 1610546861, "account": "279"}, "old": {}} 1) If I change the output type to STRING() and return a str from *process_element* everything is OK but I need to use *JsonRowSerializationSchema* later on that data. 2) I'm not sure what to return in *on_timer* as it's missing the value argument which *process_element* has. Regards On Mon, Jan 18, 2021 at 4:47 AM Shuiqiang Chen <acqua....@gmail.com> wrote: > Hi meneldor, Xingbo, > > Sorry for the late reply. > > Thanks a lot for Xingbo’s clarification. > > And according to the stacktrace of the exception, could you have a check > whether the result data match the specified return type? BTW, please share > your code if it’s ok, it will be of help to debug. > > Best, > Shuiqiang > > > > > meneldor <menel...@gmail.com> 于2021年1月15日周五 下午4:59写道: > >> I imported pyflink.common.types.Row and used it as Shuiqiang suggested but >> now Java throws a memory exception: >> >> Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space} >>> ... 11 more >>> Caused by: java.lang.OutOfMemoryError: Java heap space >>> at >>> org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91) >>> at >>> org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127) >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90) >>> at >>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>> at >>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown >>> Source) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown >>> Source) >>> >> >> Regards >> >> On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <hxbks...@gmail.com> wrote: >> >>> Hi meneldor, >>> >>> I guess Shuiqiang is not using the pyflink 1.12.0 to develop the >>> example. The signature of the `process_element` method has been changed in >>> the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to >>> send out your results. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-20647 >>> >>> Best, >>> Xingbo >>> >>> meneldor <menel...@gmail.com> 于2021年1月15日周五 上午1:20写道: >>> >>>> Thank you for the answer Shuiqiang! >>>> Im using the last apache-flink version: >>>> >>>>> Requirement already up-to-date: apache-flink in >>>>> ./venv/lib/python3.7/site-packages (1.12.0) >>>> >>>> however the method signature is using a collector: >>>> >>>> [image: image.png] >>>> Im using the *setup-pyflink-virtual-env.sh* shell script from the >>>> docs(which uses pip). >>>> >>>> Regards >>>> >>>> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com> >>>> wrote: >>>> >>>>> Hi meneldor, >>>>> >>>>> The main cause of the error is that there is a bug in >>>>> `ctx.timer_service().current_watermark()`. At the beginning the stream, >>>>> when the first record come into the KeyedProcessFunction.process_element() >>>>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at >>>>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807. >>>>> >>>>> >>> ctx.timer_service().register_event_time_timer(current_watermark + >>>>> >>> 1500) >>>>> >>>>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be >>>>> automatically converted to a long interger in python but will cause Long >>>>> value overflow in Java when deserializing the registered timer value. I >>>>> will craete a issue to fix the bug. >>>>> >>>>> Let’s return to your initial question, at PyFlink you could create a >>>>> Row Type data as bellow: >>>>> >>>>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111) >>>>> >>>>> And I wonder which release version of flink the code snippet you >>>>> provided based on? The latest API for >>>>> KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer() >>>>> will not provid a `collector` to collect output data but use `yield` which >>>>> is a more pythonic approach. >>>>> >>>>> Please refer to the following code: >>>>> >>>>> def keyed_process_function_example(): >>>>> env = StreamExecutionEnvironment.get_execution_environment() >>>>> env.set_parallelism(1) >>>>> env.get_config().set_auto_watermark_interval(2000) >>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>>>> data_stream = env.from_collection([(1, 'hello', '1603708211000'), >>>>> (2, 'hi', '1603708224000'), >>>>> (3, 'hello', '1603708226000'), >>>>> (4, 'hi', '1603708289000')], >>>>> type_info=Types.ROW([Types.INT(), >>>>> Types.STRING(), Types.STRING()])) >>>>> >>>>> class MyTimestampAssigner(TimestampAssigner): >>>>> >>>>> def extract_timestamp(self, value, record_timestamp) -> int: >>>>> return int(value[2]) >>>>> >>>>> class MyProcessFunction(KeyedProcessFunction): >>>>> >>>>> def process_element(self, value, ctx: >>>>> 'KeyedProcessFunction.Context'): >>>>> yield Row(id=ctx.get_current_key()[1], data='some_string', >>>>> timestamp=11111111) >>>>> # current_watermark = ctx.timer_service().current_watermark() >>>>> ctx.timer_service().register_event_time_timer(ctx.timestamp() >>>>> + 1500) >>>>> >>>>> def on_timer(self, timestamp: int, ctx: >>>>> 'KeyedProcessFunction.OnTimerContext'): >>>>> yield Row(id=ctx.get_current_key()[1], data='current on timer >>>>> timestamp: ' + str(timestamp), >>>>> timestamp=timestamp) >>>>> >>>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], >>>>> [Types.STRING(), Types.STRING(), Types.INT()]) >>>>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ >>>>> .with_timestamp_assigner(MyTimestampAssigner()) >>>>> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ >>>>> .key_by(lambda x: (x[0], x[1]), >>>>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \ >>>>> .process(MyProcessFunction(), >>>>> output_type=output_type_info).print() >>>>> env.execute('test keyed process function') >>>>> >>>>> >>>>> Best, >>>>> Shuiqiang >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道: >>>>> >>>>>> Hello, >>>>>> >>>>>> What is the correct way to use Python dict's as ROW type in pyflink? >>>>>> Im trying this: >>>>>> >>>>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], >>>>>> [Types.STRING(), Types.STRING(), >>>>>> Types.LONG() ]) >>>>>> >>>>>> class MyProcessFunction(KeyedProcessFunction): >>>>>> def process_element(self, value, ctx: >>>>>> 'KeyedProcessFunction.Context', out: Collector): >>>>>> result = {"id": ctx.get_current_key()[0], "data": "some_string", >>>>>> "timestamp": 111111111111} >>>>>> out.collect(result) >>>>>> current_watermark = ctx.timer_service().current_watermark() >>>>>> ctx.timer_service().register_event_time_timer(current_watermark >>>>>> + 1500) >>>>>> >>>>>> def on_timer(self, timestamp, ctx: >>>>>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'): >>>>>> logging.info(timestamp) >>>>>> out.collect("On timer timestamp: " + str(timestamp)) >>>>>> >>>>>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), >>>>>> Types.STRING()])) \ >>>>>> .process(MyProcessFunction(), output_type=output_type_info) >>>>>> >>>>>> >>>>>> I just hardcoded the values in MyProcessFunction to be sure that the >>>>>> input data doesnt mess the fields. So the data is correct but PyFlink >>>>>> trews >>>>>> an exception: >>>>>> >>>>>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) >>>>>>> ... 10 more >>>>>> >>>>>> However it works with primitive types like Types.STRING(). According to >>>>>> the documentation the ROW type corresponds to the python's dict type. >>>>>> >>>>>> >>>>>> Regards >>>>>> >>>>>>