Hi meneldor,

Yes. As the first version of Python DataStream, release-1.12 has not yet
covered all scenarios. In release-1.13, we will extend the function of
Python DataStream to cover most scenarios, and CoProcessFunction will
obviously be in it.

Best,
Xingbo

meneldor <menel...@gmail.com> 于2021年1月19日周二 下午4:52写道:

> Thank you Xingbo!
>
> Do you plan to implement CoProcess functions too? Right now i cant find a
> convenient method to connect and merge two streams?
>
> Regards
>
> On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang <hxbks...@gmail.com> wrote:
>
>> Hi meneldor,
>>
>> 1. Yes. Although release 1.12.1 has not been officially released, it is
>> indeed available for download on PyPI.
>> In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`.
>>
>> 2. Whenever an element comes, your `process_element` method will be
>> invoked, so you can directly get the `value` parameter in
>> `process_element`. The firing of the `on_timer` method depends on your
>> registering timer, as you wrote in the example
>> `ctx.timer_service().register_event_time_timer(current_watermark + 1500)`.
>> You might need state access[1] which will be supported in release-1.13. At
>> that time, you can get your state in `on_timer`, so as to conveniently
>> control the output.
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>> .
>>
>> Best,
>> Xingbo
>>
>> meneldor <menel...@gmail.com> 于2021年1月18日周一 下午10:44写道:
>>
>>> Thank you Xingbo
>>>
>>> 1. I will try to use normal list instead of named. Thanks!
>>> 2. There is a new 1.12.1 version of pyflink which is using
>>> process_element(self, value, ctx: 'KeyedProcessFunction.Context')
>>>
>>> And what about the on_timer(self, timestamp, ctx:
>>> 'KeyedProcessFunction.OnTimerContext')? Can i access the value as in
>>> process_element() in the ctx for example?
>>>
>>> Thank you!
>>>
>>> On Mon, Jan 18, 2021 at 4:18 PM Xingbo Huang <hxbks...@gmail.com> wrote:
>>>
>>>> Hi Shuiqiang, meneldor,
>>>>
>>>> 1. In fact, there is a problem with using Python `Named Row` as the
>>>> return value of user-defined function in PyFlink.
>>>>
>>>> When serializing a Row data, the serializer of each field is consistent
>>>> with the order of the Row fields. But the field order of Python `Named Row`
>>>> has been sorted by field, and it was designed to better compare Named Row
>>>> and calculate hash values. So this can lead to
>>>> serialization/deserialization errors(The correspondence between serializer
>>>> and field is wrong). It is for performance considerations that serializers
>>>> are not specified according to file name, but `Named Row` support can be
>>>> achieved at the expense of a little performance for ease of use. For the
>>>> current example, I suggest returning a list or a normal Row, instead of a
>>>> Named Row.
>>>>
>>>>
>>>> 2. In pyflink 1.12.0, the method signature of `on_timer` should be `def
>>>> process_element(self, value, ctx: 'KeyedProcessFunction.Context', out:
>>>> Collector)`[1].  If you want to send data in `on_timer`, you can use
>>>> `Collector.collect`. e.g.
>>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context',
>>>> out: Collector):
>>>> out.collect(Row('a', 'b', 'c'))
>>>>
>>>> 3. >>> I am not sure if the timestamp field should be included in
>>>> output_type_info as i did now.
>>>>
>>>> If you return data with a time_stamp field, `output_type_info` needs to
>>>> have `time_stamp` field. For example, the data returned in your example
>>>> contains `time_stamp`, so your `output_type_info` needs to have the
>>>> information of this field.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.12.0/flink-python/pyflink/datastream/functions.py#L759
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> 2021年1月18日 下午9:21,meneldor <menel...@gmail.com> 写道:
>>>>
>>>> Actually the *output_type_info* is ok, it was copy/paste typo. I
>>>> changed the function to:
>>>>
>>>> class MyProcessFunction(KeyedProcessFunction):
>>>>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>>>>         yield types.Row(id=ctx.get_current_key()[0], 
>>>> tp=ctx.get_current_key()[1], account="TEST", device_ts=value[3][2], 
>>>> timestamp=ctx.timestamp())
>>>>         ctx.timer_service().register_event_time_timer(ctx.timestamp() + 
>>>> 1500)
>>>>
>>>>     def on_timer(self, timestamp, ctx: 
>>>> 'KeyedProcessFunction.OnTimerContext'):
>>>>         yield types.Row(id=ctx.get_current_key()[0], 
>>>> tp=ctx.get_current_key()[1], account="TEST", device_ts=1111111111, 
>>>> timestamp=timestamp)
>>>>
>>>> And the type to:
>>>>
>>>> output_type_info = Types.ROW_NAMED(['id', 'tp', 'account', 'device_ts', 
>>>> 'timestamp'],
>>>>                                      [Types.STRING(), Types.STRING(), 
>>>> Types.STRING(), Types.LONG(), Types.LONG()])
>>>>
>>>> I cant return the same data in *on_timer()* because there is no value
>>>> parameter. Thats why i hardcoded *device_ts*. However the exception
>>>> persists.
>>>>
>>>> I am not sure if the timestamp field should be included in 
>>>> *output_type_info* as i did now.
>>>>
>>>> Regards
>>>>
>>>>
>>>> On Mon, Jan 18, 2021 at 2:57 PM Shuiqiang Chen <acqua....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi meneldor,
>>>>>
>>>>> Actually, the return type of the on_timer() must be the same as
>>>>> process_element(). It seems that the yield value of process_element() is
>>>>> missing the `timestamp` field.  And the `output_type_info` has four field
>>>>> names but with 5 field types. Could you align them?
>>>>>
>>>>> Best,
>>>>> Shuiqiang
>>>>>
>>>>
>>>>

Reply via email to