Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-19 Thread Xingbo Huang
Hi meneldor, Yes. As the first version of Python DataStream, release-1.12 has not yet covered all scenarios. In release-1.13, we will extend the function of Python DataStream to cover most scenarios, and CoProcessFunction will obviously be in it. Best, Xingbo meneldor 于2021年1月19日周二 下午4:52写道: >

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-19 Thread meneldor
Thank you Xingbo! Do you plan to implement CoProcess functions too? Right now i cant find a convenient method to connect and merge two streams? Regards On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang wrote: > Hi meneldor, > > 1. Yes. Although release 1.12.1 has not been officially released, it is

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread Xingbo Huang
Hi meneldor, 1. Yes. Although release 1.12.1 has not been officially released, it is indeed available for download on PyPI. In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`. 2. Whenever an element comes, your `process_element` method will be invoked, so you can directly get t

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread meneldor
Thank you Xingbo 1. I will try to use normal list instead of named. Thanks! 2. There is a new 1.12.1 version of pyflink which is using process_element( self, value, ctx: 'KeyedProcessFunction.Context') And what about the on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext')? Can i

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread Xingbo Huang
Hi Shuiqiang, meneldor, 1. In fact, there is a problem with using Python `Named Row` as the return value of user-defined function in PyFlink. When serializing a Row data, the serializer of each field is consistent with the order of the Row fields. But the field order of Python `Named Ro

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread meneldor
Actually the *output_type_info* is ok, it was copy/paste typo. I changed the function to: class MyProcessFunction(KeyedProcessFunction): def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): yield types.Row(id=ctx.get_current_key()[0], tp=ctx.get_current_key()[1], acco

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread Shuiqiang Chen
Hi meneldor, Actually, the return type of the on_timer() must be the same as process_element(). It seems that the yield value of process_element() is missing the `timestamp` field. And the `output_type_info` has four field names but with 5 field types. Could you align them? Best, Shuiqiang

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-18 Thread meneldor
Hi, here is the code. This is a JSON data from Maxwell CDC: env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.get_config().set_auto_watermark_interval(2000) env.set_parallelism(1) device_type_info = Types.ROW_NAMED(['c

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-17 Thread Shuiqiang Chen
Hi meneldor, Xingbo, Sorry for the late reply. Thanks a lot for Xingbo’s clarification. And according to the stacktrace of the exception, could you have a check whether the result data match the specified return type? BTW, please share your code if it’s ok, it will be of help to debug. Best, Sh

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-15 Thread meneldor
I imported pyflink.common.types.Row and used it as Shuiqiang suggested but now Java throws a memory exception: Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space} > ... 11 more > Caused by: java.lang.OutOfMemoryError: Java heap space > at > org.apache.flink.table.runtime.util.Se

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Xingbo Huang
Hi meneldor, I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example. The signature of the `process_element` method has been changed in the new version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out your results. [1] https://issues.apache.org/jira/browse/FLINK

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
Thank you for the answer Shuiqiang! Im using the last apache-flink version: > Requirement already up-to-date: apache-flink in > ./venv/lib/python3.7/site-packages (1.12.0) however the method signature is using a collector: [image: image.png] Im using the *setup-pyflink-virtual-env.sh* shell scr

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Shuiqiang Chen
Hi meneldor, The main cause of the error is that there is a bug in `ctx.timer_service().current_watermark()`. At the beginning the stream, when the first record come into the KeyedProcessFunction.process_element() , the current_watermark will be the Long.MIN_VALUE at Java side, while at the Python

Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
Hello, What is the correct way to use Python dict's as ROW type in pyflink? Im trying this: output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ], [Types.STRING(), Types.STRING(), Types.LONG() ]) class MyProcessFunction(KeyedProcessFunction): de