Hi Jark, Thanks for your input! Please see my comments inline.
> Isn't Table API the same way as DataSream jobs to submit Flink SQL? > DataStream API also doesn't provide a default main class for users, > why do we need to provide such one for SQL? Sorry for the confusion I caused. By DataStream jobs, I mean jobs submitted via Flink CLI which actually could be DataStream/Table jobs. I think a default main class would be user-friendly which eliminates the need for users to write a main class as SQLRunner in Flink K8s operator [1]. > I thought the proposed SqlDriver was a dedicated main class accepting SQL > files, is > that correct? Both JSON plans and SQL files are accepted. SQL Gateway should use JSON plans, while CLI users may use either JSON plans or SQL files. Please see the updated FLIP[2] for more details. > Personally, I prefer the way of init containers which doesn't depend on > additional components. > This can reduce the moving parts of a production environment. > Depending on a distributed file system makes the testing, demo, and local > setup harder than init containers. Please note that we could reuse the checkpoint storage like S3/HDFS, which should be required to run Flink in production, so I guess that would be acceptable for most users. WDYT? WRT testing, demo, and local setups, I think we could support the local filesystem scheme i.e. file://** as the state backends do. It works as long as SQL Gateway and JobManager(or SQL Driver) can access the resource directory (specified via `sql-gateway.application.storage-dir`). Thanks! [1] https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver [3] https://github.com/apache/flink/blob/3245e0443b2a4663552a5b707c5c8c46876c1f6d/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java#L161 Best, Paul Lam > 2023年6月3日 12:21,Jark Wu <imj...@gmail.com> 写道: > > Hi Paul, > > Thanks for your reply. I left my comments inline. > >> As the FLIP said, it’s good to have a default main class for Flink SQLs, >> which allows users to submit Flink SQLs in the same way as DataStream >> jobs, or else users need to write their own main class. > > Isn't Table API the same way as DataSream jobs to submit Flink SQL? > DataStream API also doesn't provide a default main class for users, > why do we need to provide such one for SQL? > >> With the help of ExecNodeGraph, do we still need the serialized >> SessionState? If not, we could make SQL Driver accepts two serialized >> formats: > > No, ExecNodeGraph doesn't need to serialize SessionState. I thought the > proposed SqlDriver was a dedicated main class accepting SQL files, is > that correct? > If true, we have to ship the SessionState for this case which is a large > work. > I think we just need a JsonPlanDriver which is a main class that accepts > JsonPlan as the parameter. > > >> The common solutions I know is to use distributed file systems or use >> init containers to localize the resources. > > Personally, I prefer the way of init containers which doesn't depend on > additional components. > This can reduce the moving parts of a production environment. > Depending on a distributed file system makes the testing, demo, and local > setup harder than init containers. > > Best, > Jark > > > > > On Fri, 2 Jun 2023 at 18:10, Paul Lam <paullin3...@gmail.com > <mailto:paullin3...@gmail.com>> wrote: > >> The FLIP is in the early phase and some details are not included, but >> fortunately, we got lots of valuable ideas from the discussion. >> >> Thanks to everyone who joined the dissuasion! >> @Weihua @Shanmon @Shengkai @Biao @Jark >> >> This weekend I’m gonna revisit and update the FLIP, adding more >> details. Hopefully, we can further align our opinions. >> >> Best, >> Paul Lam >> >>> 2023年6月2日 18:02,Paul Lam <paullin3...@gmail.com> 写道: >>> >>> Hi Jark, >>> >>> Thanks a lot for your input! >>> >>>> If we decide to submit ExecNodeGraph instead of SQL file, is it still >>>> necessary to support SQL Driver? >>> >>> I think so. Apart from usage in SQL Gateway, SQL Driver could simplify >>> Flink SQL execution with Flink CLI. >>> >>> As the FLIP said, it’s good to have a default main class for Flink SQLs, >>> which allows users to submit Flink SQLs in the same way as DataStream >>> jobs, or else users need to write their own main class. >>> >>>> SQL Driver needs to serialize SessionState which is very challenging >>>> but not detailed covered in the FLIP. >>> >>> With the help of ExecNodeGraph, do we still need the serialized >>> SessionState? If not, we could make SQL Driver accepts two serialized >>> formats: >>> >>> - SQL files for user-facing public usage >>> - ExecNodeGraph for internal usage >>> >>> It’s kind of similar to the relationship between job jars and jobgraphs. >>> >>>> Regarding "K8S doesn't support shipping multiple jars", is that true? >> Is it >>>> possible to support it? >>> >>> Yes, K8s doesn’t distribute any files. It’s the users’ responsibility to >> make >>> sure the resources are accessible in the containers. The common solutions >>> I know is to use distributed file systems or use init containers to >> localize the >>> resources. >>> >>> Now I lean toward introducing a fs to do the distribution job. WDYT? >>> >>> Best, >>> Paul Lam >>> >>>> 2023年6月1日 20:33,Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com> >>>> <mailto:imj...@gmail.com <mailto:imj...@gmail.com>>> >> 写道: >>>> >>>> Hi Paul, >>>> >>>> Thanks for starting this discussion. I like the proposal! This is a >>>> frequently requested feature! >>>> >>>> I agree with Shengkai that ExecNodeGraph as the submission object is a >>>> better idea than SQL file. To be more specific, it should be >> JsonPlanGraph >>>> or CompiledPlan which is the serializable representation. CompiledPlan >> is a >>>> clear separation between compiling/optimization/validation and >> execution. >>>> This can keep the validation and metadata accessing still on the >> SQLGateway >>>> side. This allows SQLGateway to leverage some metadata caching and UDF >> JAR >>>> caching for better compiling performance. >>>> >>>> If we decide to submit ExecNodeGraph instead of SQL file, is it still >>>> necessary to support SQL Driver? Regarding non-interactive SQL jobs, >> users >>>> can use the Table API program for application mode. SQL Driver needs to >>>> serialize SessionState which is very challenging but not detailed >> covered >>>> in the FLIP. >>>> >>>> Regarding "K8S doesn't support shipping multiple jars", is that true? >> Is it >>>> possible to support it? >>>> >>>> Best, >>>> Jark >>>> >>>> >>>> >>>> On Thu, 1 Jun 2023 at 16:58, Paul Lam <paullin3...@gmail.com >>>> <mailto:paullin3...@gmail.com> <mailto: >> paullin3...@gmail.com <mailto:paullin3...@gmail.com>>> wrote: >>>> >>>>> Hi Weihua, >>>>> >>>>> You’re right. Distributing the SQLs to the TMs is one of the >> challenging >>>>> parts of this FLIP. >>>>> >>>>> Web submission is not enabled in application mode currently as you >> said, >>>>> but it could be changed if we have good reasons. >>>>> >>>>> What do you think about introducing a distributed storage for SQL >> Gateway? >>>>> >>>>> We could make use of Flink file systems [1] to distribute the SQL >> Gateway >>>>> generated resources, that should solve the problem at its root cause. >>>>> >>>>> Users could specify Flink-supported file systems to ship files. It’s >> only >>>>> required when using SQL Gateway with K8s application mode. >>>>> >>>>> [1] >>>>> >> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/ >> >> <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/> >> < >> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/ >> >> <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/> >>> >>>>> >>>>> Best, >>>>> Paul Lam >>>>> >>>>>> 2023年6月1日 13:55,Weihua Hu <huweihua....@gmail.com >>>>>> <mailto:huweihua....@gmail.com>> 写道: >>>>>> >>>>>> Thanks Paul for your reply. >>>>>> >>>>>> SQLDriver looks good to me. >>>>>> >>>>>> 2. Do you mean a pass the SQL string a configuration or a program >>>>> argument? >>>>>> >>>>>> >>>>>> I brought this up because we were unable to pass the SQL file to Flink >>>>>> using Kubernetes mode. >>>>>> For DataStream/Python users, they need to prepare their images for the >>>>> jars >>>>>> and dependencies. >>>>>> But for SQL users, they can use a common image to run different SQL >>>>> queries >>>>>> if there are no other udf requirements. >>>>>> It would be great if the SQL query and image were not bound. >>>>>> >>>>>> Using strings is a way to decouple these, but just as you mentioned, >> it's >>>>>> not easy to pass complex SQL. >>>>>> >>>>>>> use web submission >>>>>> AFAIK, we can not use web submission in the Application mode. Please >>>>>> correct me if I'm wrong. >>>>>> >>>>>> >>>>>> Best, >>>>>> Weihua >>>>>> >>>>>> >>>>>> On Wed, May 31, 2023 at 9:37 PM Paul Lam <paullin3...@gmail.com >>>>>> <mailto:paullin3...@gmail.com>> >> wrote: >>>>>> >>>>>>> Hi Biao, >>>>>>> >>>>>>> Thanks for your comments! >>>>>>> >>>>>>>> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL >> jobs >>>>>>> in >>>>>>>> Application mode? More specifically, if we use SQL client/gateway to >>>>>>>> execute some interactive SQLs like a SELECT query, can we ask flink >> to >>>>>>> use >>>>>>>> Application mode to execute those queries after this FLIP? >>>>>>> >>>>>>> Thanks for pointing it out. I think only DMLs would be executed via >> SQL >>>>>>> Driver. >>>>>>> I'll add the scope to the FLIP. >>>>>>> >>>>>>>> 2. Deployment: I believe in YARN mode, the implementation is >> trivial as >>>>>>> we >>>>>>>> can ship files via YARN's tool easily but for K8s, things can be >> more >>>>>>>> complicated as Shengkai said. >>>>>>> >>>>>>> >>>>>>> Your input is very informative. I’m thinking about using web >> submission, >>>>>>> but it requires exposing the JobManager port which could also be a >>>>> problem >>>>>>> on K8s. >>>>>>> >>>>>>> Another approach is to explicitly require a distributed storage to >> ship >>>>>>> files, >>>>>>> but we may need a new deployment executor for that. >>>>>>> >>>>>>> What do you think of these two approaches? >>>>>>> >>>>>>>> 3. Serialization of SessionState: in SessionState, there are some >>>>>>>> unserializable fields >>>>>>>> like >> org.apache.flink.table.resource.ResourceManager#userClassLoader. >>>>> It >>>>>>>> may be worthwhile to add more details about the serialization part. >>>>>>> >>>>>>> I agree. That’s a missing part. But if we use ExecNodeGraph as >> Shengkai >>>>>>> mentioned, do we eliminate the need for serialization of >> SessionState? >>>>>>> >>>>>>> Best, >>>>>>> Paul Lam >>>>>>> >>>>>>>> 2023年5月31日 13:07,Biao Geng <biaoge...@gmail.com >>>>>>>> <mailto:biaoge...@gmail.com>> 写道: >>>>>>>> >>>>>>>> Thanks Paul for the proposal!I believe it would be very useful for >>>>> flink >>>>>>>> users. >>>>>>>> After reading the FLIP, I have some questions: >>>>>>>> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL >> jobs >>>>>>> in >>>>>>>> Application mode? More specifically, if we use SQL client/gateway to >>>>>>>> execute some interactive SQLs like a SELECT query, can we ask flink >> to >>>>>>> use >>>>>>>> Application mode to execute those queries after this FLIP? >>>>>>>> 2. Deployment: I believe in YARN mode, the implementation is >> trivial as >>>>>>> we >>>>>>>> can ship files via YARN's tool easily but for K8s, things can be >> more >>>>>>>> complicated as Shengkai said. I have implemented a simple POC >>>>>>>> < >>>>>>> >>>>> >> https://github.com/bgeng777/flink/commit/5b4338fe52ec343326927f0fc12f015dd22b1133 >> >> <https://github.com/bgeng777/flink/commit/5b4338fe52ec343326927f0fc12f015dd22b1133> >>>>>>>> >>>>>>>> based on SQL client before(i.e. consider the SQL client which >> supports >>>>>>>> executing a SQL file as the SQL driver in this FLIP). One problem I >>>>> have >>>>>>>> met is how do we ship SQL files ( or Job Graph) to the k8s side. >>>>> Without >>>>>>>> such support, users have to modify the initContainer or rebuild a >> new >>>>> K8s >>>>>>>> image every time to fetch the SQL file. Like the flink k8s operator, >>>>> one >>>>>>>> workaround is to utilize the flink config(transforming the SQL file >> to >>>>> a >>>>>>>> escaped string like Weihua mentioned) which will be converted to a >>>>>>>> ConfigMap but K8s has size limit of ConfigMaps(no larger than 1MB >>>>>>>> <https://kubernetes.io/docs/concepts/configuration/configmap/ >>>>>>>> <https://kubernetes.io/docs/concepts/configuration/configmap/>>). >> Not >>>>>>> sure >>>>>>>> if we have better solutions. >>>>>>>> 3. Serialization of SessionState: in SessionState, there are some >>>>>>>> unserializable fields >>>>>>>> like >> org.apache.flink.table.resource.ResourceManager#userClassLoader. >>>>> It >>>>>>>> may be worthwhile to add more details about the serialization part. >>>>>>>> >>>>>>>> Best, >>>>>>>> Biao Geng >>>>>>>> >>>>>>>> Paul Lam <paullin3...@gmail.com <mailto:paullin3...@gmail.com>> >>>>>>>> 于2023年5月31日周三 11:49写道: >>>>>>>> >>>>>>>>> Hi Weihua, >>>>>>>>> >>>>>>>>> Thanks a lot for your input! Please see my comments inline. >>>>>>>>> >>>>>>>>>> - Is SQLRunner the better name? We use this to run a SQL Job. (Not >>>>>>>>> strong, >>>>>>>>>> the SQLDriver is fine for me) >>>>>>>>> >>>>>>>>> I’ve thought about SQL Runner but picked SQL Driver for the >> following >>>>>>>>> reasons FYI: >>>>>>>>> >>>>>>>>> 1. I have a PythonDriver doing the same job for PyFlink [1] >>>>>>>>> 2. Flink program's main class is sort of like Driver in JDBC which >>>>>>>>> translates SQLs into >>>>>>>>> databases specific languages. >>>>>>>>> >>>>>>>>> In general, I’m +1 for SQL Driver and +0 for SQL Runner. >>>>>>>>> >>>>>>>>>> - Could we run SQL jobs using SQL in strings? Otherwise, we need >> to >>>>>>>>> prepare >>>>>>>>>> a SQL file in an image for Kubernetes application mode, which may >> be >>>>> a >>>>>>>>> bit >>>>>>>>>> cumbersome. >>>>>>>>> >>>>>>>>> Do you mean a pass the SQL string a configuration or a program >>>>> argument? >>>>>>>>> >>>>>>>>> I thought it might be convenient for testing propose, but not >>>>>>> recommended >>>>>>>>> for production, >>>>>>>>> cause Flink SQLs could be complicated and involves lots of >> characters >>>>>>> that >>>>>>>>> need to escape. >>>>>>>>> >>>>>>>>> WDYT? >>>>>>>>> >>>>>>>>>> - I noticed that we don't specify the SQLDriver jar in the >>>>>>>>> "run-application" >>>>>>>>>> command. Does that mean we need to perform automatic detection in >>>>>>> Flink? >>>>>>>>> >>>>>>>>> Yes! It’s like running a PyFlink job with the following command: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> ./bin/flink run \ >>>>>>>>> --pyModule table.word_count \ >>>>>>>>> --pyFiles examples/python/table >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> The CLI determines if it’s a SQL job, if yes apply the SQL Driver >>>>>>>>> automatically. >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>> >>>>> >> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java >> >> <https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Paul Lam >>>>>>>>> >>>>>>>>>> 2023年5月30日 21:56,Weihua Hu <huweihua....@gmail.com >>>>>>>>>> <mailto:huweihua....@gmail.com>> 写道: >>>>>>>>>> >>>>>>>>>> Thanks Paul for the proposal. >>>>>>>>>> >>>>>>>>>> +1 for this. It is valuable in improving ease of use. >>>>>>>>>> >>>>>>>>>> I have a few questions. >>>>>>>>>> - Is SQLRunner the better name? We use this to run a SQL Job. (Not >>>>>>>>> strong, >>>>>>>>>> the SQLDriver is fine for me) >>>>>>>>>> - Could we run SQL jobs using SQL in strings? Otherwise, we need >> to >>>>>>>>> prepare >>>>>>>>>> a SQL file in an image for Kubernetes application mode, which may >> be >>>>> a >>>>>>>>> bit >>>>>>>>>> cumbersome. >>>>>>>>>> - I noticed that we don't specify the SQLDriver jar in the >>>>>>>>> "run-application" >>>>>>>>>> command. Does that mean we need to perform automatic detection in >>>>>>> Flink? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Weihua >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, May 29, 2023 at 7:24 PM Paul Lam <paullin3...@gmail.com >>>>>>>>>> <mailto:paullin3...@gmail.com>> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi team, >>>>>>>>>>> >>>>>>>>>>> I’d like to start a discussion about FLIP-316 [1], which >> introduces >>>>> a >>>>>>>>> SQL >>>>>>>>>>> driver as the >>>>>>>>>>> default main class for Flink SQL jobs. >>>>>>>>>>> >>>>>>>>>>> Currently, Flink SQL could be executed out of the box either via >> SQL >>>>>>>>>>> Client/Gateway >>>>>>>>>>> or embedded in a Flink Java/Python program. >>>>>>>>>>> >>>>>>>>>>> However, each one has its drawback: >>>>>>>>>>> >>>>>>>>>>> - SQL Client/Gateway doesn’t support the application deployment >> mode >>>>>>> [2] >>>>>>>>>>> - Flink Java/Python program requires extra work to write a >> non-SQL >>>>>>>>> program >>>>>>>>>>> >>>>>>>>>>> Therefore, I propose adding a SQL driver to act as the default >> main >>>>>>>>> class >>>>>>>>>>> for SQL jobs. >>>>>>>>>>> Please see the FLIP docs for details and feel free to comment. >>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver >>>>>>>>>>> < >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-316:+Introduce+SQL+Driver >>>>>>>>>>>> >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-26541 < >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-26541> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Paul Lam