Hi Till,

I have written a more complicated PyFlink job. Compared with the previous
single python udf job, there is an extra stage of converting between table
and datastream. Besides, I added a python map function for the job. Because
python datastream has not yet implemented Thread mode, the python map
function operator is still running in Process Mode.

```
source = t_env.from_path("source_table")  # schema [id: String, d:int]

@udf(result_type=DataTypes.STRING(), func_type="general")
def upper(x):
    return x.upper()

t_env.create_temporary_system_function("upper", upper)
# python map function
ds = t_env.to_data_stream(source) \
                .map(lambda x: x, output_type=Types.ROW_NAMED(["id", "d"],

                         [Types.STRING(),

                          Types.INT()]))

t = t_env.from_data_stream(ds)
t.select('upper(id)').execute_insert('sink_table')
```

The input data size is 1k.

Mode                                                 |   QPS
Process Mode                                   |    3w
Thread Mode + Process mode         |    4w

>From the table, we can find that the nodes run in Process Mode is the
performance bottleneck of the job.

Best,
Xingbo

Till Rohrmann <trohrm...@apache.org> 于2022年1月5日周三 23:16写道:

> Thanks for the detailed answer Xingbo. Quick question on the last figure in
> the FLIP. You said that this is a real world Flink stream SQL job. The
> title of the graph says UDF(String Upper). So do I understand correctly
> that string upper is the real world use case you have measured? What I
> wanted to ask is how a slightly more complex Flink Python job (involving
> shuffles, with back pressure, etc.) performs using the thread and process
> mode respectively.
>
> If the mode solely needs changes in the Python part of Flink, then I don't
> have any concerns from the runtime perspective.
>
> Cheers,
> Till
>
> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hxbks...@gmail.com> wrote:
>
> > Hi Till and Thomas,
> >
> > Thanks a lot for joining the discussion.
> >
> > For Till:
> >
> > >>> Is the slower performance currently the biggest pain point for our
> > Python users? What else are our Python users mainly complaining about?
> >
> > PyFlink users are most concerned about two parts, one is better
> usability,
> > the other is performance. Users often make some benchmarks when they
> > investigate pyflink[1][2] at the beginning to decide whether to use
> > PyFlink. The performance of a PyFlink job depends on two parts, one is
> the
> > overhead of the PyFlink framework, and the other is the Python function
> > complexity implemented by the user. In the Python ecosystem, there are
> many
> > libraries and tools that can help Python users improve the performance of
> > their custom functions, such as pandas[3], numba[4] and cython[5]. So we
> > hope that the framework overhead of PyFlink itself can also be reduced.
> >
> > >>> Concerning the proposed changes, are there any changes required on
> the
> > runtime side (changes to Flink)? How will the deployment and memory
> > management be affected when using the thread execution mode?
> >
> > The changes on PyFlink Runtime mentioned here are actually only
> > modifications of PyFlink custom Operators, such as
> > PythonScalarFunctionOperator[6], which won't affect deployment and memory
> > management.
> >
> > >>> One more question that came to my mind: How much performance
> > improvement dowe gain on a real-world Python use case? Were the
> > measurements more like micro benchmarks where the Python UDF was called
> w/o
> > the overhead of Flink? I would just be curious how much the Python
> > component contributes to the overall runtime of a real world job. Do we
> > have some data on this?
> >
> > The last figure I put in FLIP is the performance comparison of three real
> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in
> Process
> > Mode, and a Python UDF job in Thread Mode. The calculated value of QPS is
> > the end-to-end Flink job execution result. As shown in the performance
> > comparison chart, the performance of Python udf with the same function
> can
> > often only reach 20% of Java udf, so the performance of python udf will
> > often become the performance bottleneck in a PyFlink job.
> >
> > For Thomas:
> >
> > The first time that I realized the framework overhead of various IPC
> > (socket, grpc, shared memory) cannot be ignored in some scenarios is due
> to
> > an image algorithm prediction job of PyFlink. Its input parameters are a
> > series of huge image binary arrays, and its data size is bigger than 1G.
> > The performance overhead of serialization/deserialization has become an
> > important part of its poor performance. Although this job is a bit
> extreme,
> > through measurement, we did find the impact of the
> > serialization/deserialization overhead caused by larger size parameters
> on
> > the performance of the IPC framework.
> >
> > >>> As I understand it, you measured the difference in throughput for
> UPPER
> > between process and embedded mode and the difference is 50% increased
> > throughput?
> >
> > This 50% is the result when the data size is less than 100byte. When the
> > data size reaches 1k, the performance of the Embedded Mode will reach
> about
> > 3.5 times the performance of the Process Mode shown in the FLIP. When the
> > data reaches 1M, the performance of Embedded Mode can reach 5 times the
> > performance of the Process Mode. The biggest difference here is that in
> > Embedded Mode, input/result data does not need to be
> > serialized/deserialized.
> >
> > >>> Is that a typical UDF in your usage?
> >
> > The reason for choosing UPPER is that a simpler udf implementation can
> make
> > it easier to evaluate the performance of different execution modes.
> >
> > >>> What do you observe when the function becomes more complex?
> >
> > We can analyze the QPS of the framework (process mode or embedded mode)
> and
> > the QPS of the UDF calculation logic separately. A more complex UDF means
> > that it is a UDF with a smaller QPS. The main factors that affect the
> > framework QPS are data type of parameters, number of parameters and size
> of
> > parameters, which will greatly affect the serialization/deserialization
> > overhead in Process Mode.
> >
> > The purpose of introducing thread mode is not to replace Process mode,
> but
> > to supplement Python udf usage scenarios such as cep and join, and some
> > scenarios where higher performance is pursued. Compared with Thread mode,
> > Process Mode has better isolation, which can solve the limitation of
> thread
> > mode in some scenarios such as session mode.
> >
> > [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
> > [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
> > [3] https://pandas.pydata.org/
> > [4] https://cython.org/
> > [5] https://numba.pydata.org/
> > [6]
> >
> >
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
> >
> > Best,
> > Xingbo
> >
> > Thomas Weise <t...@apache.org> 于2022年1月4日周二 04:23写道:
> >
> > > Interesting discussion. It caught my attention because I was also
> > > interested in the Beam fn execution overhead a few years ago.
> > >
> > > We found back then that while in theory the fn protocol overhead is
> > > very significant, for realistic function workloads that overhead was
> > > negligible. And of course it all depends on the use case. It might be
> > > worthwhile to quantify a couple more scenarios.
> > >
> > > As I understand it, you measured the difference in throughput for
> > > UPPER between process and embedded mode and the difference is 50%
> > > increased throughput? Is that a typical UDF in your usage? What do you
> > > observe when the function becomes more complex?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <trohrm...@apache.org>
> > wrote:
> > > >
> > > > One more question that came to my mind: How much performance
> > improvement
> > > do
> > > > we gain on a real-world Python use case? Were the measurements more
> > like
> > > > micro benchmarks where the Python UDF was called w/o the overhead of
> > > Flink?
> > > > I would just be curious how much the Python component contributes to
> > the
> > > > overall runtime of a real world job. Do we have some data on this?
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <trohrm...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Xingbo,
> > > > >
> > > > > Thanks for creating this FLIP. I have two general questions about
> the
> > > > > motivation for this FLIP because I have only very little exposure
> to
> > > our
> > > > > Python users:
> > > > >
> > > > > Is the slower performance currently the biggest pain point for our
> > > Python
> > > > > users?
> > > > >
> > > > > What else are our Python users mainly complaining about?
> > > > >
> > > > > Concerning the proposed changes, are there any changes required on
> > the
> > > > > runtime side (changes to Flink)? How will the deployment and memory
> > > > > management be affected when using the thread execution mode?
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hxbks...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi Wei,
> > > > >>
> > > > >> Thanks a lot for your feedback. Very good questions!
> > > > >>
> > > > >> >>> 1. It seems that we dynamically load an embedded Python and
> user
> > > > >> dependencies in the TM process. Can they be uninstalled cleanly
> > after
> > > the
> > > > >> task finished? i.e. Can we use the Thread Mode in session mode and
> > > Pyflink
> > > > >> shell?
> > > > >>
> > > > >> I mentioned the limitation of this part in FLIP. There is no
> problem
> > > > >> without changing the python interpreter, but if you need to change
> > the
> > > > >> python interpreter, there is really no way to reload the Python
> > > library.
> > > > >> The problem is mainly caused by many Python libraries having an
> > > assumption
> > > > >> that they own the process alone.
> > > > >>
> > > > >> >>> 2. Does one TM have only one embedded Python running at the
> same
> > > time?
> > > > >> If all the Python operator in the TM share the same PVM, will
> there
> > > be a
> > > > >> loss in performance?
> > > > >>
> > > > >> Your understanding is correct that one TM have only one embedded
> > > Python
> > > > >> running at the same time. I guess you are worried about the
> > > performance
> > > > >> loss of multi threads caused by Python GIL. There is a one-to-one
> > > > >> correspondence between Java worker thread and Python
> > subinterpreters.
> > > > >> Although the subinterpreters has not yet completely overcome the
> GIL
> > > > >> sharing problem(The Python community’s recent plan for a
> > > per-interpreter
> > > > >> GIL is also under discussion[1]), the performance of
> subinterpreters
> > > is
> > > > >> very close to that of multiprocessing [2].
> > > > >>
> > > > >> >>> 3. How do we load the relevant c library if the
> > python.executable
> > > is
> > > > >> provided by users?
> > > > >>
> > > > >> Once python.executable is provided, PEMJA will dynamically load
> the
> > > > >> CPython
> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so installed
> > in
> > > the
> > > > >> python environment.
> > > > >>
> > > > >> >>> May there be a risk of version conflicts?
> > > > >>
> > > > >> I understand that this question is actually discussing whether
> C/C++
> > > has a
> > > > >> way to solve the problem of relying on different versions of a
> > > library.
> > > > >> First of all, we know that if there is only static linking, there
> > > will be
> > > > >> no such problem.  And I have studied the source code of
> CPython[3],
> > > and
> > > > >> there is no usage of dynamic linking. The rest is the case where
> > > dynamic
> > > > >> linking is used in the C library written by the users. There are
> > many
> > > ways
> > > > >> to solve this problem with dynamic linking, but after all, this
> > > library is
> > > > >> written by users, and it is difficult for us to guarantee that
> there
> > > will
> > > > >> be no conflicts. At this time, Process Mode will be the choice of
> > falk
> > > > >> back.
> > > > >>
> > > > >> [1]
> > > > >>
> > > > >>
> > >
> >
> https://mail.python.org/archives/list/python-...@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> > > > >> [2]
> > > > >>
> > > > >>
> > >
> >
> https://mail.python.org/archives/list/python-...@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> > > > >> [3] https://github.com/python/cpython
> > > > >>
> > > > >> Best,
> > > > >> Xingbo
> > > > >>
> > > > >> Wei Zhong <weizhong0...@gmail.com> 于2021年12月31日周五 11:49写道:
> > > > >>
> > > > >> > Hi Xingbo,
> > > > >> >
> > > > >> > Thanks for creating this FLIP. Big +1 for it!
> > > > >> >
> > > > >> > I have some question about the Thread Mode:
> > > > >> >
> > > > >> > 1. It seems that we dynamically load an embedded Python and user
> > > > >> > dependencies in the TM process. Can they be uninstalled cleanly
> > > after
> > > > >> the
> > > > >> > task finished? i.e. Can we use the Thread Mode in session mode
> and
> > > > >> Pyflink
> > > > >> > shell?
> > > > >> >
> > > > >> > 2. Does one TM have only one embedded Python running at the same
> > > time?
> > > > >> If
> > > > >> > all the Python operator in the TM share the same PVM, will there
> > be
> > > a
> > > > >> loss
> > > > >> > in performance?
> > > > >> >
> > > > >> > 3. How do we load the relevant c library if the
> python.executable
> > is
> > > > >> > provided by users? May there be a risk of version conflicts?
> > > > >> >
> > > > >> > Best,
> > > > >> > Wei
> > > > >> >
> > > > >> >
> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hxbks...@gmail.com> 写道:
> > > > >> > >
> > > > >> > > Hi everyone,
> > > > >> > >
> > > > >> > > I would like to start a discussion thread on "Support PyFlink
> > > Runtime
> > > > >> > > Execution in Thread Mode"
> > > > >> > >
> > > > >> > > We have provided PyFlink Runtime framework to support Python
> > > > >> user-defined
> > > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is
> > > called
> > > > >> > Process
> > > > >> > > Mode, which depends on an inter-process communication
> > architecture
> > > > >> based
> > > > >> > on
> > > > >> > > the Apache Beam Portability framework. Although starting a
> > > dedicated
> > > > >> > > process to execute Python user-defined functions could have
> > better
> > > > >> > resource
> > > > >> > > isolation, it will bring greater resource and performance
> > > overhead.
> > > > >> > >
> > > > >> > > In order to overcome the resource and performance problems on
> > > Process
> > > > >> > Mode,
> > > > >> > > we will propose a new execution mode which executes Python
> > > > >> user-defined
> > > > >> > > functions in the same thread instead of a separate process.
> > > > >> > >
> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to
> > this
> > > > >> email
> > > > >> > > thread. Looking forward to your feedback!
> > > > >> > >
> > > > >> > > Best,
> > > > >> > > Xingbo
> > > > >> > >
> > > > >> > > [1]
> > > > >> > >
> > > > >> >
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > >
> >
>

Reply via email to