Hi Till, I have written a more complicated PyFlink job. Compared with the previous single python udf job, there is an extra stage of converting between table and datastream. Besides, I added a python map function for the job. Because python datastream has not yet implemented Thread mode, the python map function operator is still running in Process Mode.
``` source = t_env.from_path("source_table") # schema [id: String, d:int] @udf(result_type=DataTypes.STRING(), func_type="general") def upper(x): return x.upper() t_env.create_temporary_system_function("upper", upper) # python map function ds = t_env.to_data_stream(source) \ .map(lambda x: x, output_type=Types.ROW_NAMED(["id", "d"], [Types.STRING(), Types.INT()])) t = t_env.from_data_stream(ds) t.select('upper(id)').execute_insert('sink_table') ``` The input data size is 1k. Mode | QPS Process Mode | 3w Thread Mode + Process mode | 4w >From the table, we can find that the nodes run in Process Mode is the performance bottleneck of the job. Best, Xingbo Till Rohrmann <trohrm...@apache.org> 于2022年1月5日周三 23:16写道: > Thanks for the detailed answer Xingbo. Quick question on the last figure in > the FLIP. You said that this is a real world Flink stream SQL job. The > title of the graph says UDF(String Upper). So do I understand correctly > that string upper is the real world use case you have measured? What I > wanted to ask is how a slightly more complex Flink Python job (involving > shuffles, with back pressure, etc.) performs using the thread and process > mode respectively. > > If the mode solely needs changes in the Python part of Flink, then I don't > have any concerns from the runtime perspective. > > Cheers, > Till > > On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hxbks...@gmail.com> wrote: > > > Hi Till and Thomas, > > > > Thanks a lot for joining the discussion. > > > > For Till: > > > > >>> Is the slower performance currently the biggest pain point for our > > Python users? What else are our Python users mainly complaining about? > > > > PyFlink users are most concerned about two parts, one is better > usability, > > the other is performance. Users often make some benchmarks when they > > investigate pyflink[1][2] at the beginning to decide whether to use > > PyFlink. The performance of a PyFlink job depends on two parts, one is > the > > overhead of the PyFlink framework, and the other is the Python function > > complexity implemented by the user. In the Python ecosystem, there are > many > > libraries and tools that can help Python users improve the performance of > > their custom functions, such as pandas[3], numba[4] and cython[5]. So we > > hope that the framework overhead of PyFlink itself can also be reduced. > > > > >>> Concerning the proposed changes, are there any changes required on > the > > runtime side (changes to Flink)? How will the deployment and memory > > management be affected when using the thread execution mode? > > > > The changes on PyFlink Runtime mentioned here are actually only > > modifications of PyFlink custom Operators, such as > > PythonScalarFunctionOperator[6], which won't affect deployment and memory > > management. > > > > >>> One more question that came to my mind: How much performance > > improvement dowe gain on a real-world Python use case? Were the > > measurements more like micro benchmarks where the Python UDF was called > w/o > > the overhead of Flink? I would just be curious how much the Python > > component contributes to the overall runtime of a real world job. Do we > > have some data on this? > > > > The last figure I put in FLIP is the performance comparison of three real > > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in > Process > > Mode, and a Python UDF job in Thread Mode. The calculated value of QPS is > > the end-to-end Flink job execution result. As shown in the performance > > comparison chart, the performance of Python udf with the same function > can > > often only reach 20% of Java udf, so the performance of python udf will > > often become the performance bottleneck in a PyFlink job. > > > > For Thomas: > > > > The first time that I realized the framework overhead of various IPC > > (socket, grpc, shared memory) cannot be ignored in some scenarios is due > to > > an image algorithm prediction job of PyFlink. Its input parameters are a > > series of huge image binary arrays, and its data size is bigger than 1G. > > The performance overhead of serialization/deserialization has become an > > important part of its poor performance. Although this job is a bit > extreme, > > through measurement, we did find the impact of the > > serialization/deserialization overhead caused by larger size parameters > on > > the performance of the IPC framework. > > > > >>> As I understand it, you measured the difference in throughput for > UPPER > > between process and embedded mode and the difference is 50% increased > > throughput? > > > > This 50% is the result when the data size is less than 100byte. When the > > data size reaches 1k, the performance of the Embedded Mode will reach > about > > 3.5 times the performance of the Process Mode shown in the FLIP. When the > > data reaches 1M, the performance of Embedded Mode can reach 5 times the > > performance of the Process Mode. The biggest difference here is that in > > Embedded Mode, input/result data does not need to be > > serialized/deserialized. > > > > >>> Is that a typical UDF in your usage? > > > > The reason for choosing UPPER is that a simpler udf implementation can > make > > it easier to evaluate the performance of different execution modes. > > > > >>> What do you observe when the function becomes more complex? > > > > We can analyze the QPS of the framework (process mode or embedded mode) > and > > the QPS of the UDF calculation logic separately. A more complex UDF means > > that it is a UDF with a smaller QPS. The main factors that affect the > > framework QPS are data type of parameters, number of parameters and size > of > > parameters, which will greatly affect the serialization/deserialization > > overhead in Process Mode. > > > > The purpose of introducing thread mode is not to replace Process mode, > but > > to supplement Python udf usage scenarios such as cep and join, and some > > scenarios where higher performance is pursued. Compared with Thread mode, > > Process Mode has better isolation, which can solve the limitation of > thread > > mode in some scenarios such as session mode. > > > > [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html > > [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html > > [3] https://pandas.pydata.org/ > > [4] https://cython.org/ > > [5] https://numba.pydata.org/ > > [6] > > > > > https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java > > > > Best, > > Xingbo > > > > Thomas Weise <t...@apache.org> 于2022年1月4日周二 04:23写道: > > > > > Interesting discussion. It caught my attention because I was also > > > interested in the Beam fn execution overhead a few years ago. > > > > > > We found back then that while in theory the fn protocol overhead is > > > very significant, for realistic function workloads that overhead was > > > negligible. And of course it all depends on the use case. It might be > > > worthwhile to quantify a couple more scenarios. > > > > > > As I understand it, you measured the difference in throughput for > > > UPPER between process and embedded mode and the difference is 50% > > > increased throughput? Is that a typical UDF in your usage? What do you > > > observe when the function becomes more complex? > > > > > > Thanks, > > > Thomas > > > > > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <trohrm...@apache.org> > > wrote: > > > > > > > > One more question that came to my mind: How much performance > > improvement > > > do > > > > we gain on a real-world Python use case? Were the measurements more > > like > > > > micro benchmarks where the Python UDF was called w/o the overhead of > > > Flink? > > > > I would just be curious how much the Python component contributes to > > the > > > > overall runtime of a real world job. Do we have some data on this? > > > > > > > > Cheers, > > > > Till > > > > > > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <trohrm...@apache.org> > > > wrote: > > > > > > > > > Hi Xingbo, > > > > > > > > > > Thanks for creating this FLIP. I have two general questions about > the > > > > > motivation for this FLIP because I have only very little exposure > to > > > our > > > > > Python users: > > > > > > > > > > Is the slower performance currently the biggest pain point for our > > > Python > > > > > users? > > > > > > > > > > What else are our Python users mainly complaining about? > > > > > > > > > > Concerning the proposed changes, are there any changes required on > > the > > > > > runtime side (changes to Flink)? How will the deployment and memory > > > > > management be affected when using the thread execution mode? > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hxbks...@gmail.com> > > > wrote: > > > > > > > > > >> Hi Wei, > > > > >> > > > > >> Thanks a lot for your feedback. Very good questions! > > > > >> > > > > >> >>> 1. It seems that we dynamically load an embedded Python and > user > > > > >> dependencies in the TM process. Can they be uninstalled cleanly > > after > > > the > > > > >> task finished? i.e. Can we use the Thread Mode in session mode and > > > Pyflink > > > > >> shell? > > > > >> > > > > >> I mentioned the limitation of this part in FLIP. There is no > problem > > > > >> without changing the python interpreter, but if you need to change > > the > > > > >> python interpreter, there is really no way to reload the Python > > > library. > > > > >> The problem is mainly caused by many Python libraries having an > > > assumption > > > > >> that they own the process alone. > > > > >> > > > > >> >>> 2. Does one TM have only one embedded Python running at the > same > > > time? > > > > >> If all the Python operator in the TM share the same PVM, will > there > > > be a > > > > >> loss in performance? > > > > >> > > > > >> Your understanding is correct that one TM have only one embedded > > > Python > > > > >> running at the same time. I guess you are worried about the > > > performance > > > > >> loss of multi threads caused by Python GIL. There is a one-to-one > > > > >> correspondence between Java worker thread and Python > > subinterpreters. > > > > >> Although the subinterpreters has not yet completely overcome the > GIL > > > > >> sharing problem(The Python community’s recent plan for a > > > per-interpreter > > > > >> GIL is also under discussion[1]), the performance of > subinterpreters > > > is > > > > >> very close to that of multiprocessing [2]. > > > > >> > > > > >> >>> 3. How do we load the relevant c library if the > > python.executable > > > is > > > > >> provided by users? > > > > >> > > > > >> Once python.executable is provided, PEMJA will dynamically load > the > > > > >> CPython > > > > >> library (libpython.*so or libpython.*dylib) and pemja.so installed > > in > > > the > > > > >> python environment. > > > > >> > > > > >> >>> May there be a risk of version conflicts? > > > > >> > > > > >> I understand that this question is actually discussing whether > C/C++ > > > has a > > > > >> way to solve the problem of relying on different versions of a > > > library. > > > > >> First of all, we know that if there is only static linking, there > > > will be > > > > >> no such problem. And I have studied the source code of > CPython[3], > > > and > > > > >> there is no usage of dynamic linking. The rest is the case where > > > dynamic > > > > >> linking is used in the C library written by the users. There are > > many > > > ways > > > > >> to solve this problem with dynamic linking, but after all, this > > > library is > > > > >> written by users, and it is difficult for us to guarantee that > there > > > will > > > > >> be no conflicts. At this time, Process Mode will be the choice of > > falk > > > > >> back. > > > > >> > > > > >> [1] > > > > >> > > > > >> > > > > > > https://mail.python.org/archives/list/python-...@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV > > > > >> [2] > > > > >> > > > > >> > > > > > > https://mail.python.org/archives/list/python-...@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ > > > > >> [3] https://github.com/python/cpython > > > > >> > > > > >> Best, > > > > >> Xingbo > > > > >> > > > > >> Wei Zhong <weizhong0...@gmail.com> 于2021年12月31日周五 11:49写道: > > > > >> > > > > >> > Hi Xingbo, > > > > >> > > > > > >> > Thanks for creating this FLIP. Big +1 for it! > > > > >> > > > > > >> > I have some question about the Thread Mode: > > > > >> > > > > > >> > 1. It seems that we dynamically load an embedded Python and user > > > > >> > dependencies in the TM process. Can they be uninstalled cleanly > > > after > > > > >> the > > > > >> > task finished? i.e. Can we use the Thread Mode in session mode > and > > > > >> Pyflink > > > > >> > shell? > > > > >> > > > > > >> > 2. Does one TM have only one embedded Python running at the same > > > time? > > > > >> If > > > > >> > all the Python operator in the TM share the same PVM, will there > > be > > > a > > > > >> loss > > > > >> > in performance? > > > > >> > > > > > >> > 3. How do we load the relevant c library if the > python.executable > > is > > > > >> > provided by users? May there be a risk of version conflicts? > > > > >> > > > > > >> > Best, > > > > >> > Wei > > > > >> > > > > > >> > > > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hxbks...@gmail.com> 写道: > > > > >> > > > > > > >> > > Hi everyone, > > > > >> > > > > > > >> > > I would like to start a discussion thread on "Support PyFlink > > > Runtime > > > > >> > > Execution in Thread Mode" > > > > >> > > > > > > >> > > We have provided PyFlink Runtime framework to support Python > > > > >> user-defined > > > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is > > > called > > > > >> > Process > > > > >> > > Mode, which depends on an inter-process communication > > architecture > > > > >> based > > > > >> > on > > > > >> > > the Apache Beam Portability framework. Although starting a > > > dedicated > > > > >> > > process to execute Python user-defined functions could have > > better > > > > >> > resource > > > > >> > > isolation, it will bring greater resource and performance > > > overhead. > > > > >> > > > > > > >> > > In order to overcome the resource and performance problems on > > > Process > > > > >> > Mode, > > > > >> > > we will propose a new execution mode which executes Python > > > > >> user-defined > > > > >> > > functions in the same thread instead of a separate process. > > > > >> > > > > > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to > > this > > > > >> email > > > > >> > > thread. Looking forward to your feedback! > > > > >> > > > > > > >> > > Best, > > > > >> > > Xingbo > > > > >> > > > > > > >> > > [1] > > > > >> > > > > > > >> > > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > >