Thank you for the answer Shuiqiang! Im using the last apache-flink version:
> Requirement already up-to-date: apache-flink in > ./venv/lib/python3.7/site-packages (1.12.0) however the method signature is using a collector: [image: image.png] Im using the *setup-pyflink-virtual-env.sh* shell script from the docs(which uses pip). Regards On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com> wrote: > Hi meneldor, > > The main cause of the error is that there is a bug in > `ctx.timer_service().current_watermark()`. At the beginning the stream, > when the first record come into the KeyedProcessFunction.process_element() > , the current_watermark will be the Long.MIN_VALUE at Java side, while at > the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807. > > >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500) > > Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be > automatically converted to a long interger in python but will cause Long > value overflow in Java when deserializing the registered timer value. I > will craete a issue to fix the bug. > > Let’s return to your initial question, at PyFlink you could create a Row > Type data as bellow: > > >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111) > > And I wonder which release version of flink the code snippet you provided > based on? The latest API for KeyedProcessFunction.process_element() and > KeyedProcessFunction.on_timer() will not provid a `collector` to collect > output data but use `yield` which is a more pythonic approach. > > Please refer to the following code: > > def keyed_process_function_example(): > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > env.get_config().set_auto_watermark_interval(2000) > env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > data_stream = env.from_collection([(1, 'hello', '1603708211000'), > (2, 'hi', '1603708224000'), > (3, 'hello', '1603708226000'), > (4, 'hi', '1603708289000')], > type_info=Types.ROW([Types.INT(), > Types.STRING(), Types.STRING()])) > > class MyTimestampAssigner(TimestampAssigner): > > def extract_timestamp(self, value, record_timestamp) -> int: > return int(value[2]) > > class MyProcessFunction(KeyedProcessFunction): > > def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): > yield Row(id=ctx.get_current_key()[1], data='some_string', > timestamp=11111111) > # current_watermark = ctx.timer_service().current_watermark() > ctx.timer_service().register_event_time_timer(ctx.timestamp() + > 1500) > > def on_timer(self, timestamp: int, ctx: > 'KeyedProcessFunction.OnTimerContext'): > yield Row(id=ctx.get_current_key()[1], data='current on timer > timestamp: ' + str(timestamp), > timestamp=timestamp) > > output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], > [Types.STRING(), Types.STRING(), Types.INT()]) > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(MyTimestampAssigner()) > data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: (x[0], x[1]), > key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \ > .process(MyProcessFunction(), output_type=output_type_info).print() > env.execute('test keyed process function') > > > Best, > Shuiqiang > > > > > > meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道: > >> Hello, >> >> What is the correct way to use Python dict's as ROW type in pyflink? Im >> trying this: >> >> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], >> [Types.STRING(), Types.STRING(), >> Types.LONG() ]) >> >> class MyProcessFunction(KeyedProcessFunction): >> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', >> out: Collector): >> result = {"id": ctx.get_current_key()[0], "data": "some_string", >> "timestamp": 111111111111} >> out.collect(result) >> current_watermark = ctx.timer_service().current_watermark() >> ctx.timer_service().register_event_time_timer(current_watermark + >> 1500) >> >> def on_timer(self, timestamp, ctx: >> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'): >> logging.info(timestamp) >> out.collect("On timer timestamp: " + str(timestamp)) >> >> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), >> Types.STRING()])) \ >> .process(MyProcessFunction(), output_type=output_type_info) >> >> >> I just hardcoded the values in MyProcessFunction to be sure that the >> input data doesnt mess the fields. So the data is correct but PyFlink trews >> an exception: >> >> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) >>> at >>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213) >>> at >>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58) >>> at >>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) >>> at >>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) >>> ... 10 more >> >> However it works with primitive types like Types.STRING(). According to the >> documentation the ROW type corresponds to the python's dict type. >> >> >> Regards >> >>