Hi Sonam! To answer this, let me quickly provide some background on the two ways flink deployments / job submissions work. See also here for some background: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#deployment-modes
What is common in all setups is that the query compilation / the dataflow assembly happens where the entry-point program runs. If you are programatically setting up the application with a StreamExecutionEnvironment / TableEnvironment, then the query compilation (and JobGraph generation) happens where the program's main()-method is. If you are submitting via the SQL Client, then the SQL Client is the entrypoint program, and the query compilation happens where the SQLClient runs. Now, where is that entry-point program executed if you deploy a job? That depends on your deployment mode. (1) Session Mode: Here you have a running cluster with a Dispatcher that has the REST endpoint that accepts job submissions. Jobs are submitted via HTTP transporting a serialized JobGraph. The entry-point can run anywhere and uses a HTTP client to send the generated JobGraph to the Dispatcher. ==> Here you need to worry about matching the versions of the client (the entry-point process, like SQL Client) and the deployed session cluster. (2) Application Mode: The entry-point (SQL Client or application program) spawns the JobManager when the program is executed. The jobgraph is passed as a Java object directly to the spawned JM component. The is an HTTP endpoint, but it is not for submitting jobs, only for the Web UI and for commands like cancelling execution. This mode should allow you to encapsulate a Flink application (a SQL query) completely self-contained and not need to sync versions between clients and clusters. ---- The internal abstraction for all ways to execute the programs are the PipelineExecutors. https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java If you look at the subclasses of "PipelineExecutor" you basically see all built-in deployment modes. To create a customized version of the Application deployment mode, (or maybe the Job deployment mode) you can dig for example through the EmbeddedExecutor and the ApplicationDispatcherBootstrap. Hope that helps... Best, Stephan On Tue, Jun 29, 2021 at 5:01 PM Sonam Mandal <soman...@linkedin.com> wrote: > Hi Matthias, > > Thanks for getting back to me. We are trying to build a system where users > can focus on writing Flink SQL applications and we handle the full > lifecycle of their Flink cluster and job. We would like to let users focus > on just their SQL and UDF logic. In such an environment, we cannot enforce > that all users must use a single Flink version. We intend to have this > setup in kubernetes where within the same kubernetes cluster we can create > multiple Flink clusters to which jobs are submitted. > > Due to this, using an interactive shell will not be an option, nor do we > want to directly expose this to users except for testing purposes. > > I see the latest 1.13 release now has an option to pass a SQL file as > input to the SQL client and it’ll take care of running the job. We will > explore this option as well. I believe this is a new feature which wasn’t > available in 1.12, right? Does the planning happen in the SQL client or on > the job manager? We ran into issues with job graph incompatibility if our > code directly submitted the SQL to the remote environment or if we used > /bin/flink to run this jar that does the SQL conversion. > > We currently have a POC idea which takes the SQL as a file and we wrote a > simple job runner which reads this SQL and executes it. We are using Flink > REST APIs to upload this jar and submit the job so that the job graph > generation happens on the job manager. We no longer see the job graph > incompatibility issues. > > Is there any reason not to use the above approach? We noticed that the > Flink client (/bin/flink) does job graph generation itself and not via the > REST API, any reason why it doesn’t leverage the REST API? > > Nice thing about using REST is that we can now run multiple Flink cluster > versions and our job submission code doesn’t need to know which flink > client version to use. > > We definitely saw this job graph incompatibility with /bin/flink. We still > need to test out the sql client with the -f option to assess whether we > will require keeping multiple versions around should we decide to use this > option. So we were wondering what the recommendation is within the Flink > community on how to handle such cases. Hope this clarifies our use case > better. > > Also, as for the state incompatibility between major Flink versions, I see > the thread mentions using a tool to rewrite the savepoints. Is this the > only recommended way to handle this? Is this safe and does it work in all > scenarios? > > Thanks, > Sonam > > > ------------------------------ > *From:* Matthias Pohl <matth...@ververica.com> > *Sent:* Tuesday, June 29, 2021 02:29 > *To:* Sonam Mandal > *Cc:* user@flink.apache.org; Jark Wu; Timo Walther > *Subject:* Re: Recommended way to submit a SQL job via code without > getting tied to a Flink version? > > Hi Sonam, > what's the reason for not using the Flink SQL client? Because of the > version issue? I only know that FlinkSQL's state is not > backwards-compatible between major Flink versions [1]. But that seems to be > unrelated to what you describe. > > I'm gonna add Jark and Timo to this thread. Maybe, they can add more > insights. > > Matthias > > [1] https://issues.apache.org/jira/browse/FLINK-20823 > <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-20823&data=04%7C01%7Csomandal%40linkedin.com%7C8f2bc487eac94e12087308d93ae0517e%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637605557525140470%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000&sdata=pvMNaKJAZjKL%2FFX%2BevvguaEOjkhkBfH0D0HYYE0YS0s%3D&reserved=0> > > On Tue, Jun 22, 2021 at 9:44 PM Sonam Mandal <soman...@linkedin.com> > wrote: > >> Hello, >> >> We've written a simple tool which takes SQL statements as input and uses >> a StreamTableEnvironment to eventually submit this to the Flink cluster. >> We've noticed that the Flink library versions we depend on must match the >> Flink version running in our Kubernetes cluster for the job submission to >> be successful. If the versions don't match, the job submission goes through >> but the job errors out for various reasons. We do not want to use the SQL >> shell (which I also believe is version specific and must run on the same >> pod as the Job Manager). >> >> Is there any version agnostic way to submit SQL jobs to the Flink cluster? >> >> Thanks, >> Sonam >> >