Hi meneldor,
The main cause of the error is that there is a bug in
`ctx.timer_service().current_watermark()`. At the beginning the stream,
when the first record come into the KeyedProcessFunction.process_element()
, the current_watermark will be the Long.MIN_VALUE at Java side, while at
the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
automatically converted to a long interger in python but will cause Long
value overflow in Java when deserializing the registered timer value. I
will craete a issue to fix the bug.
Let’s return to your initial question, at PyFlink you could create a Row
Type data as bellow:
>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)
And I wonder which release version of flink the code snippet you provided
based on? The latest API for KeyedProcessFunction.process_element() and
KeyedProcessFunction.on_timer() will not provid a `collector` to collect
output data but use `yield` which is a more pythonic approach.
Please refer to the following code:
def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
(2, 'hi', '1603708224000'),
(3, 'hello', '1603708226000'),
(4, 'hi', '1603708289000')],
type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string',
timestamp=11111111)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp()
+ 1500)
def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on
timer timestamp: ' + str(timestamp),
timestamp=timestamp)
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]),
key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')
Best,
Shuiqiang
meneldor <[email protected]> 于2021年1月14日周四 下午10:45写道:
> Hello,
>
> What is the correct way to use Python dict's as ROW type in pyflink? Im
> trying this:
>
> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
> [Types.STRING(), Types.STRING(),
> Types.LONG() ])
>
> class MyProcessFunction(KeyedProcessFunction):
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context',
> out: Collector):
> result = {"id": ctx.get_current_key()[0], "data": "some_string",
> "timestamp": 111111111111}
> out.collect(result)
> current_watermark = ctx.timer_service().current_watermark()
> ctx.timer_service().register_event_time_timer(current_watermark +
> 1500)
>
> def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext',
> out: 'Collector'):
> logging.info(timestamp)
> out.collect("On timer timestamp: " + str(timestamp))
>
> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(),
> Types.STRING()])) \
> .process(MyProcessFunction(), output_type=output_type_info)
>
>
> I just hardcoded the values in MyProcessFunction to be sure that the input
> data doesnt mess the fields. So the data is correct but PyFlink trews an
> exception:
>
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>> at
>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>> ... 10 more
>
> However it works with primitive types like Types.STRING(). According to the
> documentation the ROW type corresponds to the python's dict type.
>
>
> Regards
>
>