Hi meneldor,
Yes. As the first version of Python DataStream, release-1.12 has not yet
covered all scenarios. In release-1.13, we will extend the function of
Python DataStream to cover most scenarios, and CoProcessFunction will
obviously be in it.
Best,
Xingbo
meneldor 于2021年1月19日周二 下午4:52写道:
>
Thank you Xingbo!
Do you plan to implement CoProcess functions too? Right now i cant find a
convenient method to connect and merge two streams?
Regards
On Tue, Jan 19, 2021 at 4:16 AM Xingbo Huang wrote:
> Hi meneldor,
>
> 1. Yes. Although release 1.12.1 has not been officially released, it is
Hi meneldor,
1. Yes. Although release 1.12.1 has not been officially released, it is
indeed available for download on PyPI.
In PyFlink 1.12.1, you only need to `yield` your output in `on_timer`.
2. Whenever an element comes, your `process_element` method will be
invoked, so you can directly get t
Thank you Xingbo
1. I will try to use normal list instead of named. Thanks!
2. There is a new 1.12.1 version of pyflink which is using process_element(
self, value, ctx: 'KeyedProcessFunction.Context')
And what about the on_timer(self, timestamp, ctx:
'KeyedProcessFunction.OnTimerContext')? Can i
Hi Shuiqiang, meneldor,
1. In fact, there is a problem with using Python `Named Row` as the return
value of user-defined function in PyFlink.
When serializing a Row data, the serializer of each field is consistent
with the order of the Row fields. But the field order of Python `Named Ro
Actually the *output_type_info* is ok, it was copy/paste typo. I changed
the function to:
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield types.Row(id=ctx.get_current_key()[0],
tp=ctx.get_current_key()[1], acco
Hi meneldor,
Actually, the return type of the on_timer() must be the same as
process_element(). It seems that the yield value of process_element() is
missing the `timestamp` field. And the `output_type_info` has four field
names but with 5 field types. Could you align them?
Best,
Shuiqiang
Hi, here is the code. This is a JSON data from Maxwell CDC:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.get_config().set_auto_watermark_interval(2000)
env.set_parallelism(1)
device_type_info = Types.ROW_NAMED(['c
Hi meneldor, Xingbo,
Sorry for the late reply.
Thanks a lot for Xingbo’s clarification.
And according to the stacktrace of the exception, could you have a check
whether the result data match the specified return type? BTW, please share
your code if it’s ok, it will be of help to debug.
Best,
Sh
I imported pyflink.common.types.Row and used it as Shuiqiang suggested but
now Java throws a memory exception:
Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space}
> ... 11 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.flink.table.runtime.util.Se
Hi meneldor,
I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example.
The signature of the `process_element` method has been changed in the new
version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out
your results.
[1] https://issues.apache.org/jira/browse/FLINK
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:
> Requirement already up-to-date: apache-flink in
> ./venv/lib/python3.7/site-packages (1.12.0)
however the method signature is using a collector:
[image: image.png]
Im using the *setup-pyflink-virtual-env.sh* shell scr
Hi meneldor,
The main cause of the error is that there is a bug in
`ctx.timer_service().current_watermark()`. At the beginning the stream,
when the first record come into the KeyedProcessFunction.process_element()
, the current_watermark will be the Long.MIN_VALUE at Java side, while at
the Python
Hello,
What is the correct way to use Python dict's as ROW type in pyflink? Im
trying this:
output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
[Types.STRING(), Types.STRING(),
Types.LONG() ])
class MyProcessFunction(KeyedProcessFunction):
de
14 matches
Mail list logo