Hi Yun Gao,
thanks for your email and your review !
My comments are inline
Le 08/11/2022 à 06:51, Yun Gao a écrit :
Hi Etienne,
Very thanks for the article! Flink is currently indeed keeping
increasing the
ability of unified batch / stream processing with the same api, and
its a great
pleasure that more and more users are trying this functionality. But I
have some questions regarding some details.
First IMO, as a whole for the long run Flink will have two unified
APIs, namely Table / SQL
API and DataStream API. Users could express the computation logic with
these two APIs
for both bounded and unbounded data processing.
Yes that is what I understood also throughout the discussions and jiras.
And I also think IMHO that reducing the number of APIs to 2 was the good
Underlying Flink provides two
execution modes: the streaming mode works with both bounded and
unbounded data,
and it executes in a way of incremental processing based on state; the
batch mode works
only with bounded data, and it executes in a ways level-by-level
similar to the traditional
batch processing frameworks. Users could switch the execution mode via
EnvironmentSettings.inBatchMode() for
As recommended in Flink docs(1) I have enabled the batch mode as I
though it would be more efficient on my bounded pipeline but as a matter
of fact the streaming mode seems to be more efficient on my use case.
I'll test with higher volumes to confirm.
Specially for DataStream, as implemented in FLIP-140, currently all
the existing DataStream
operation supports the batch execution mode in a unified way[1]: data
will be sorted for the
keyBy() edges according to the key, then the following operations like
reduce() could receive
all the data belonging to the same key consecutively, then it could
directly reducing the records
of the same key without maintaining the intermediate states. In this
way users could write the
same code for both streaming and batch processing with the same code.
Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream
pipeline will work with no modification if I plug an unbounded source to it.
# Regarding the migration of Join / Reduce
First I think Reduce is always supported and users could write
directly, and if batch execution mode is set, the reduce will not be
executed in a incremental way,
instead is acts much like sort-based aggregation in the traditional
batch processing framework.
Regarding Join, although the issue of FLINK-22587 indeed exists:
current join has to be bound
to a window and the GlobalWindow does not work properly, but with some
more try currently
it does not need users to re-write the whole join from scratch: Users
could write a dedicated
window assigner that assigns all the records to the same window
instance and return
EventTimeTrigger.create() as the default event-time trigger [2]. Then
it works
.where(a -> a.f0)
.equalTo(b -> b.f0)
.window(new EndOfStreamWindows())
It does not requires records have event-time attached since the
trigger of window is only
relying on the time range of the window and the assignment does not
need event-time either.
The behavior of the join is also similar to sort-based join if batch
mode is enabled.
Of course it is not easy to use to let users do the workaround and
we'll try to fix this issue in 1.17.
Yes, this is a better workaround than the manual state-based join that I
proposed. I tried it and it works perfectly with similar performance.
# Regarding support of Sort / Limit
Currently these two operators are indeed not supported in the
DataStream API directly. One initial
though for these two operations are that users may convert the
DataStream to Table API and use
Table API for these two operators:
DataStream<xx> xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
Yes I knew that workaround but I decided not to use it because I have a
special SQL based implementation (for comparison reasons) so I did not
want to mix SQL and DataStream APIs in the same pipeline.
How do you think about this option? We are also assessing if the
combination of DataStream
API / Table API is sufficient for all the batch users. Any suggestions
are warmly welcome.
I guess that outside of my use case of comparing the performance of the
3 Flink APIs (broader subject than this article), users can easily mix
the APIs in the same pipeline. If we really want to have these
operations in the DataStream API maybe wrapping state-based
implementations could be good if their performance meets our expectations.
Yun Gao
I'll update the article and the code with your suggestions. Thanks again.
From:liu ron <ron9....@gmail.com>
Send Time:2022 Nov. 8 (Tue.) 10:21
To:dev <d...@flink.apache.org>; Etienne Chauchot
<echauc...@apache.org>; user <user@flink.apache.org>
Subject:Re: [blog article] Howto migrate a real-life batch
pipeline from the DataSet API to the DataStream API
Thanks for your post, It looks very good to me, also maybe for
yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
Wow, cool! Thanks for your work.
It'll be definitely helpful for the users that want to migrate
their batch job from DataSet API to DataStream API.
Best regards,
----- 原始邮件 -----
发件人: "Etienne Chauchot" <echauc...@apache.org>
收件人: "dev" <d...@flink.apache.org>, "User" <user@flink.apache.org>
发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
主题: [blog article] Howto migrate a real-life batch pipeline from
the DataSet API to the DataStream API
Hi everyone,
In case some of you are interested, I just posted a blog article
migrating a real-life batch pipeline from the DataSet API to the
DataStream API: