Hi, here is the code. This is a JSON data from Maxwell CDC:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.get_config().set_auto_watermark_interval(2000)
env.set_parallelism(1)
device_type_info = Types.ROW_NAMED(['commit',
                             'ts',
                             'type',
                             'data',
                             'old'],
                                     [Types.BOOLEAN(),
                                     Types.LONG(),
                                     Types.STRING(),
                                     Types.ROW_NAMED(['id', 'tp',
'device_ts', 'account'],
[Types.STRING(),Types.STRING(),Types.LONG(),Types.STRING()]),
                                     Types.ROW_NAMED(['id', 'tp',
'device_ts', 'account'],
[Types.STRING(),Types.STRING(),Types.LONG(),Types.STRING()])])
output_type_info = Types.ROW_NAMED(['id',
                             'tp',
                             'account',
                             'device_ts'
                                    ],
                                     [Types.STRING(),
                                     Types.STRING(),
                                     Types.INT(),
                                     Types.STRING(),
                                     Types.LONG()
                                      ])

device_row_schema =
JsonRowDeserializationSchema.builder().type_info(device_type_info).build()

class KafkaRowTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, value: Any, record_timestamp: int):
        return int(value[3][2])


class MyKeySelector(KeySelector):
    def get_key(self, value):
        return (str(value[3][0]), str(value[3][1]))


class MyProcessFunction(KeyedProcessFunction):
    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        yield types.Row(id=ctx.get_current_key()[0],
tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2])
        ctx.timer_service().register_event_time_timer(ctx.timestamp() + 1500)

    def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
        yield types.Row(id=ctx.get_current_key()[0],
tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111,
timestamp=timestamp)

device_consumer = FlinkKafkaConsumer("device", device_row_schema,
{'bootstrap.servers': 'localhost:9092'})
device_consumer.set_start_from_earliest()
watermark_strategy =
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(KafkaRowTimestampAssigner())

device_ds = env.add_source(device_consumer)
device_ds.assign_timestamps_and_watermarks(watermark_strategy).key_by(MyKeySelector(),
key_type_info=Types.TUPLE([Types.STRING(), Types.STRING()])) \
   .process(MyProcessFunction(), output_type=output_type_info)

job_client = env.execute_async('Device enrichment Job')
job_client.get_job_execution_result().result()


This is the input data: {"commit": true, "ts": 1610546861, "type":
"update", "data": {"id": "id2", "tp": "B", "device_ts": 1610546861,
"account": "279"}, "old": {}}

1) If I change the output type to STRING() and return a str from
*process_element* everything is OK but I need to use
*JsonRowSerializationSchema* later on that data.

2) I'm not sure what to return in *on_timer* as it's missing the value
argument which *process_element* has.


Regards


On Mon, Jan 18, 2021 at 4:47 AM Shuiqiang Chen <acqua....@gmail.com> wrote:

> Hi meneldor, Xingbo,
>
> Sorry for the late reply.
>
> Thanks a lot for Xingbo’s clarification.
>
> And according to the stacktrace of the exception, could you have a check
> whether the result data match the specified return type? BTW, please share
> your code if it’s ok, it will be of help to debug.
>
> Best,
> Shuiqiang
>
>
>
>
> meneldor <menel...@gmail.com> 于2021年1月15日周五 下午4:59写道:
>
>> I imported pyflink.common.types.Row and used it as Shuiqiang suggested but
>> now Java throws a memory exception:
>>
>> Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space}
>>> ... 11 more
>>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>> at
>>> org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91)
>>> at
>>> org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127)
>>> at
>>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90)
>>> at
>>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>> at
>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown
>>> Source)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown
>>> Source)
>>>
>>
>> Regards
>>
>> On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <hxbks...@gmail.com> wrote:
>>
>>> Hi meneldor,
>>>
>>> I guess Shuiqiang is not using the pyflink 1.12.0 to develop the
>>> example. The signature of the `process_element` method has been changed in
>>> the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to
>>> send out your results.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-20647
>>>
>>> Best,
>>> Xingbo
>>>
>>> meneldor <menel...@gmail.com> 于2021年1月15日周五 上午1:20写道:
>>>
>>>> Thank you for the answer Shuiqiang!
>>>> Im using the last apache-flink version:
>>>>
>>>>> Requirement already up-to-date: apache-flink in
>>>>> ./venv/lib/python3.7/site-packages (1.12.0)
>>>>
>>>> however the method signature is using a collector:
>>>>
>>>> [image: image.png]
>>>>  Im using the *setup-pyflink-virtual-env.sh* shell script from the
>>>> docs(which uses pip).
>>>>
>>>> Regards
>>>>
>>>> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi meneldor,
>>>>>
>>>>> The main cause of the error is that there is a bug in
>>>>> `ctx.timer_service().current_watermark()`. At the beginning the stream,
>>>>> when the first record come into the KeyedProcessFunction.process_element()
>>>>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
>>>>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>>>>>
>>>>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 
>>>>> >>> 1500)
>>>>>
>>>>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
>>>>> automatically converted to a long interger in python but will cause Long
>>>>> value overflow in Java when deserializing the registered timer value. I
>>>>> will craete a issue to fix the bug.
>>>>>
>>>>> Let’s return to your initial question, at PyFlink you could create a
>>>>> Row Type data as bellow:
>>>>>
>>>>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)
>>>>>
>>>>> And I wonder which release version of flink the code snippet you
>>>>> provided based on? The latest API for
>>>>> KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer()
>>>>> will not provid a `collector` to collect output data but use `yield` which
>>>>> is a more pythonic approach.
>>>>>
>>>>> Please refer to the following code:
>>>>>
>>>>> def keyed_process_function_example():
>>>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>>>     env.set_parallelism(1)
>>>>>     env.get_config().set_auto_watermark_interval(2000)
>>>>>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>     data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>>>>>                                        (2, 'hi', '1603708224000'),
>>>>>                                        (3, 'hello', '1603708226000'),
>>>>>                                        (4, 'hi', '1603708289000')],
>>>>>                                       type_info=Types.ROW([Types.INT(), 
>>>>> Types.STRING(), Types.STRING()]))
>>>>>
>>>>>     class MyTimestampAssigner(TimestampAssigner):
>>>>>
>>>>>         def extract_timestamp(self, value, record_timestamp) -> int:
>>>>>             return int(value[2])
>>>>>
>>>>>     class MyProcessFunction(KeyedProcessFunction):
>>>>>
>>>>>         def process_element(self, value, ctx: 
>>>>> 'KeyedProcessFunction.Context'):
>>>>>             yield Row(id=ctx.get_current_key()[1], data='some_string', 
>>>>> timestamp=11111111)
>>>>>             # current_watermark = ctx.timer_service().current_watermark()
>>>>>             ctx.timer_service().register_event_time_timer(ctx.timestamp() 
>>>>> + 1500)
>>>>>
>>>>>         def on_timer(self, timestamp: int, ctx: 
>>>>> 'KeyedProcessFunction.OnTimerContext'):
>>>>>             yield Row(id=ctx.get_current_key()[1], data='current on timer 
>>>>> timestamp: ' + str(timestamp),
>>>>>                       timestamp=timestamp)
>>>>>
>>>>>     output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
>>>>> [Types.STRING(), Types.STRING(), Types.INT()])
>>>>>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>>>>>         .with_timestamp_assigner(MyTimestampAssigner())
>>>>>     data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>>>>>         .key_by(lambda x: (x[0], x[1]), 
>>>>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
>>>>>         .process(MyProcessFunction(), 
>>>>> output_type=output_type_info).print()
>>>>>     env.execute('test keyed process function')
>>>>>
>>>>>
>>>>> Best,
>>>>> Shuiqiang
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> What is the correct way to use Python dict's as ROW type in pyflink?
>>>>>> Im trying this:
>>>>>>
>>>>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>>>>>                                      [Types.STRING(), Types.STRING(), 
>>>>>> Types.LONG() ])
>>>>>>
>>>>>> class MyProcessFunction(KeyedProcessFunction):
>>>>>>     def process_element(self, value, ctx: 
>>>>>> 'KeyedProcessFunction.Context', out: Collector):
>>>>>>         result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>>>>>> "timestamp": 111111111111}
>>>>>>         out.collect(result)
>>>>>>         current_watermark = ctx.timer_service().current_watermark()
>>>>>>         ctx.timer_service().register_event_time_timer(current_watermark 
>>>>>> + 1500)
>>>>>>
>>>>>>     def on_timer(self, timestamp, ctx: 
>>>>>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
>>>>>>         logging.info(timestamp)
>>>>>>         out.collect("On timer timestamp: " + str(timestamp))
>>>>>>
>>>>>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
>>>>>> Types.STRING()])) \
>>>>>>    .process(MyProcessFunction(), output_type=output_type_info)
>>>>>>
>>>>>>
>>>>>> I just hardcoded the values in MyProcessFunction to be sure that the
>>>>>> input data doesnt mess the fields. So the data is correct but PyFlink 
>>>>>> trews
>>>>>> an exception:
>>>>>>
>>>>>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>>>>>>> ... 10 more
>>>>>>
>>>>>> However it works with primitive types like Types.STRING(). According to 
>>>>>> the documentation the ROW type corresponds to the python's dict type.
>>>>>>
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>>

Reply via email to