Thank you for the answer Shuiqiang!
Im using the last apache-flink version:

> Requirement already up-to-date: apache-flink in
> ./venv/lib/python3.7/site-packages (1.12.0)

however the method signature is using a collector:

[image: image.png]
 Im using the *setup-pyflink-virtual-env.sh* shell script from the
docs(which uses pip).

Regards

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com> wrote:

> Hi meneldor,
>
> The main cause of the error is that there is a bug in
> `ctx.timer_service().current_watermark()`. At the beginning the stream,
> when the first record come into the KeyedProcessFunction.process_element()
> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>
> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>
> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
> automatically converted to a long interger in python but will cause Long
> value overflow in Java when deserializing the registered timer value. I
> will craete a issue to fix the bug.
>
> Let’s return to your initial question, at PyFlink you could create a Row
> Type data as bellow:
>
> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)
>
> And I wonder which release version of flink the code snippet you provided
> based on? The latest API for KeyedProcessFunction.process_element() and
> KeyedProcessFunction.on_timer() will not provid a `collector` to collect
> output data but use `yield` which is a more pythonic approach.
>
> Please refer to the following code:
>
> def keyed_process_function_example():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     env.get_config().set_auto_watermark_interval(2000)
>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>                                        (2, 'hi', '1603708224000'),
>                                        (3, 'hello', '1603708226000'),
>                                        (4, 'hi', '1603708289000')],
>                                       type_info=Types.ROW([Types.INT(), 
> Types.STRING(), Types.STRING()]))
>
>     class MyTimestampAssigner(TimestampAssigner):
>
>         def extract_timestamp(self, value, record_timestamp) -> int:
>             return int(value[2])
>
>     class MyProcessFunction(KeyedProcessFunction):
>
>         def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>             yield Row(id=ctx.get_current_key()[1], data='some_string', 
> timestamp=11111111)
>             # current_watermark = ctx.timer_service().current_watermark()
>             ctx.timer_service().register_event_time_timer(ctx.timestamp() + 
> 1500)
>
>         def on_timer(self, timestamp: int, ctx: 
> 'KeyedProcessFunction.OnTimerContext'):
>             yield Row(id=ctx.get_current_key()[1], data='current on timer 
> timestamp: ' + str(timestamp),
>                       timestamp=timestamp)
>
>     output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
> [Types.STRING(), Types.STRING(), Types.INT()])
>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>         .with_timestamp_assigner(MyTimestampAssigner())
>     data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>         .key_by(lambda x: (x[0], x[1]), 
> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
>         .process(MyProcessFunction(), output_type=output_type_info).print()
>     env.execute('test keyed process function')
>
>
> Best,
> Shuiqiang
>
>
>
>
>
> meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道:
>
>> Hello,
>>
>> What is the correct way to use Python dict's as ROW type in pyflink? Im
>> trying this:
>>
>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>                                      [Types.STRING(), Types.STRING(), 
>> Types.LONG() ])
>>
>> class MyProcessFunction(KeyedProcessFunction):
>>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
>> out: Collector):
>>         result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>> "timestamp": 111111111111}
>>         out.collect(result)
>>         current_watermark = ctx.timer_service().current_watermark()
>>         ctx.timer_service().register_event_time_timer(current_watermark + 
>> 1500)
>>
>>     def on_timer(self, timestamp, ctx: 
>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
>>         logging.info(timestamp)
>>         out.collect("On timer timestamp: " + str(timestamp))
>>
>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
>> Types.STRING()])) \
>>    .process(MyProcessFunction(), output_type=output_type_info)
>>
>>
>> I just hardcoded the values in MyProcessFunction to be sure that the
>> input data doesnt mess the fields. So the data is correct but PyFlink trews
>> an exception:
>>
>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>> at
>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>>> at
>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>>> ... 10 more
>>
>> However it works with primitive types like Types.STRING(). According to the 
>> documentation the ROW type corresponds to the python's dict type.
>>
>>
>> Regards
>>
>>

Reply via email to