Hi meneldor, Yes. As the first version of Python DataStream, release-1.12 has not yet covered all scenarios. In release-1.13, we will extend the function of Python DataStream to cover most scenarios, and CoProcessFunction will obviously be in it.
Best, Xingbo meneldor <menel...@gmail.com> 于2021年1月19日周二 下午4:52写道: > Thank you Xingbo! > > Do you plan to implement CoProcess functions too? Right now i cant find a > convenient method to connect and merge two streams? > > Regards > > On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang <hxbks...@gmail.com> wrote: > >> Hi meneldor, >> >> 1. Yes. Although release 1.12.1 has not been officially released, it is >> indeed available for download on PyPI. >> In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`. >> >> 2. Whenever an element comes, your `process_element` method will be >> invoked, so you can directly get the `value` parameter in >> `process_element`. The firing of the `on_timer` method depends on your >> registering timer, as you wrote in the example >> `ctx.timer_service().register_event_time_timer(current_watermark + 1500)`. >> You might need state access[1] which will be supported in release-1.13. At >> that time, you can get your state in `on_timer`, so as to conveniently >> control the output. >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API >> . >> >> Best, >> Xingbo >> >> meneldor <menel...@gmail.com> 于2021年1月18日周一 下午10:44写道: >> >>> Thank you Xingbo >>> >>> 1. I will try to use normal list instead of named. Thanks! >>> 2. There is a new 1.12.1 version of pyflink which is using >>> process_element(self, value, ctx: 'KeyedProcessFunction.Context') >>> >>> And what about the on_timer(self, timestamp, ctx: >>> 'KeyedProcessFunction.OnTimerContext')? Can i access the value as in >>> process_element() in the ctx for example? >>> >>> Thank you! >>> >>> On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <hxbks...@gmail.com> wrote: >>> >>>> Hi Shuiqiang, meneldor, >>>> >>>> 1. In fact, there is a problem with using Python `Named Row` as the >>>> return value of user-defined function in PyFlink. >>>> >>>> When serializing a Row data, the serializer of each field is consistent >>>> with the order of the Row fields. But the field order of Python `Named Row` >>>> has been sorted by field, and it was designed to better compare Named Row >>>> and calculate hash values. So this can lead to >>>> serialization/deserialization errors(The correspondence between serializer >>>> and field is wrong). It is for performance considerations that serializers >>>> are not specified according to file name, but `Named Row` support can be >>>> achieved at the expense of a little performance for ease of use. For the >>>> current example, I suggest returning a list or a normal Row, instead of a >>>> Named Row. >>>> >>>> >>>> 2. In pyflink 1.12.0, the method signature of `on_timer` should be `def >>>> process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: >>>> Collector)`[1]. If you want to send data in `on_timer`, you can use >>>> `Collector.collect`. e.g. >>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', >>>> out: Collector): >>>> out.collect(Row('a', 'b', 'c')) >>>> >>>> 3. >>> I am not sure if the timestamp field should be included in >>>> output_type_info as i did now. >>>> >>>> If you return data with a time_stamp field, `output_type_info` needs to >>>> have `time_stamp` field. For example, the data returned in your example >>>> contains `time_stamp`, so your `output_type_info` needs to have the >>>> information of this field. >>>> >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759 >>>> >>>> Best, >>>> Xingbo >>>> >>>> 2021年1月18日 下午9:21,meneldor <menel...@gmail.com> 写道: >>>> >>>> Actually the *output_type_info* is ok, it was copy/paste typo. I >>>> changed the function to: >>>> >>>> class MyProcessFunction(KeyedProcessFunction): >>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): >>>> yield types.Row(id=ctx.get_current_key()[0], >>>> tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], >>>> timestamp=ctx.timestamp()) >>>> ctx.timer_service().register_event_time_timer(ctx.timestamp() + >>>> 1500) >>>> >>>> def on_timer(self, timestamp, ctx: >>>> 'KeyedProcessFunction.OnTimerContext'): >>>> yield types.Row(id=ctx.get_current_key()[0], >>>> tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, >>>> timestamp=timestamp) >>>> >>>> And the type to: >>>> >>>> output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', >>>> 'timestamp'], >>>> [Types.STRING(), Types.STRING(), >>>> Types.STRING(), Types.LONG(), Types.LONG()]) >>>> >>>> I cant return the same data in *on_timer()* because there is no value >>>> parameter. Thats why i hardcoded *device_ts*. However the exception >>>> persists. >>>> >>>> I am not sure if the timestamp field should be included in >>>> *output_type_info* as i did now. >>>> >>>> Regards >>>> >>>> >>>> On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com> >>>> wrote: >>>> >>>>> Hi meneldor, >>>>> >>>>> Actually, the return type of the on_timer() must be the same as >>>>> process_element(). It seems that the yield value of process_element() is >>>>> missing the `timestamp` field. And the `output_type_info` has four field >>>>> names but with 5 field types. Could you align them? >>>>> >>>>> Best, >>>>> Shuiqiang >>>>> >>>> >>>>