Hi Thomas, Ok, I agree with the necessity of running a job without a Java Client. FLIP-73 aims at introducing a Pipeline (or FlinkPipeline) interface that is the common interface of StreamGraph and Plan. Maybe we could reintroduce something like Program that returns a Pipeline for this purpose.
Best, Aljoscha > On 26. Sep 2019, at 19:30, Thomas Weise <t...@apache.org> wrote: > > Hi Aljoscha, > > Thanks for taking a look! > > Multiple options to approach the submission part for the Beam use case are > discussed in [1]. > > I'm actually now working on a different approach that creates a Flink jar > at build time. > > To the point of whether UserProgram.main() should be called in JobManager > or not: I believe it is important to provide the user an option to submit a > job without running a Java client. Today that necessitates that the Java > entry point will be called on the JM. > > The executor related work as such won't change that. However, it would be > nice to have a separate mechanism that allows the user to specify an entry > point that produces the FlinkPipeline/plan, w/o having to "execute" through > the kind of hacky context environment. > > Thanks, > Thomas > > [1] > https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d > > > > > On Thu, Sep 26, 2019 at 7:15 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> Regarding the original proposal: I don’t think spawning another process >> inside the JarHandler.runJar() is the way to go here. Looking at the bigger >> picture, the proposal would get us to roughly this situation: >> >> 1. Spawn Kubernetes containers (JobManager and TaskManagers) >> 2. User does a REST call to JobManager.runJar() to submit the user job >> 3. JobManager.runJar() opens a port that waits for job submission >> 4. JobMananger.runJar() invokes UserProgram.main() >> 5. UserProgram.main() launches a process (BeamJobService) that opens a >> port to wait for a Python process to connect to it >> 6. UserProgram.main() launches another process (the Python code, or any >> language, really) that connects to BeamJobService to submit the Pipeline >> 7. BeamJobService receives the Pipeline and talks to the port open on >> JobManager (via REST service, maybe) to submit the Job >> 8. Job is executed >> 9. Where is UserProgram.main() at this point? >> >> I think that even running UserProgram.main() in the JobManager is already >> too much. A JobManager should accept JobGraphs (or something) and execute >> them, nothing more. Running UserProgram.main() makes some things >> complicated or weird. For example, what happens when that >> UserProgram.main() creates a RemoteEnvironment and uses that? What happens >> when the user code calls execute() multiple times. >> >> I think a good solution for the motivating use case is to >> >> a) run BeamJobService as a separate service that talks to a running >> JobManager via REST for submitting jobs that it receives >> >> b) Spawning a JobManager inside the BeamJobService, i.e. the >> BeamJobService is like the entry point in a per-job Kubernetes model. >> Something that the new Executor work ([1], [2]) will enable. >> >> Any thoughts? I’m happy to jump on a call about this because these things >> are very tricky to figure out and I might be wrong. >> >> Best, >> Aljoscha >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission >> [2] >> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631 >> >>> On 6. Aug 2019, at 09:51, Till Rohrmann <trohrm...@apache.org> wrote: >>> >>> I think there was the idea to make the JobGraph a "public"/stable >> interface >>> other projects can rely on at some point. If I remember correctly, then >> we >>> wanted to define a proto buf definition for the JobGraph so that clients >>> written in a different language can submit JobGraphs and we could extend >>> the data structure. As far as I know, this effort hasn't been started yet >>> and is still in the backlog (I think there doesn't exist a JIRA issue >> yet). >>> >>> The problem came up when discussing additions to the JobGraph because >> they >>> need to be backwards compatible otherwise newer version of Flink would >> not >>> be able to recover jobs. I think so far Flink provides backwards >>> compatibility between different versions of the JobGraph. However, this >> is >>> not officially guaranteed. >>> >>> Cheers, >>> Till >>> >>> On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <wander4...@gmail.com> wrote: >>> >>>> It sounds like a request to change the interface Program into >>>> >>>> public interface Program { >>>> JobGraph getJobGraph(String... args); >>>> } >>>> >>>> Also, given that JobGraph is said as internal interface or >>>> cannot be relied on, we might introduce and use a >>>> representation that allows for cross version compatibility. >>>> >>>> >>>> Thomas Weise <t...@apache.org> 于2019年8月6日周二 上午12:11写道: >>>> >>>>> If the goal is to keep job creation and job submission separate and we >>>>> agree that there should be more flexibility for the job construction, >>>> then >>>>> JobGraph and friends should be stable API that the user can depend on. >> If >>>>> that's the case, the path Chesnay pointed to may become viable. >>>>> >>>>> There was discussion in the past that JobGraph cannot be relied on WRT >>>>> backward compatibility and I would expect that at some point we want to >>>>> move to a representation that allows for cross version compatibility. >>>> Beam >>>>> is an example how this could be accomplished (with its pipeline proto). >>>>> >>>>> So if the Beam job server was able to produce the JobGraph, is there >>>>> agreement that we should provide a mechanism that allows the program >>>> entry >>>>> point to return the JobGraph directly (without using the >>>>> ExecutionEnvironment to build it)? >>>>> >>>>> >>>>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <wander4...@gmail.com> wrote: >>>>> >>>>>> Hi Thomas, >>>>>> >>>>>> If REST handler calls main(), the behavior inside main() is >>>>>> unpredictable. >>>>>> >>>>>> Now the jar run handler extract the job graph and submit >>>>>> it with the job id configured in REST request. If REST >>>>>> handler calls main() we can hardly even know how much >>>>>> jobs are executed. >>>>>> >>>>>> A new environment, as you said, >>>>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be >>>>>> added to satisfy your requirement. However, it is a bit >>>>>> out of Flink scope. It might be better to write your own >>>>>> REST handler. >>>>>> >>>>>> WebMonitorExtension is for extending REST handlers but >>>>>> it seems also unable to customize... >>>>>> >>>>>> Best, >>>>>> tison. >>>>>> >>>>>> >>>>>> Thomas Weise <t...@apache.org> 于2019年8月3日周六 上午4:09写道: >>>>>> >>>>>>> Thanks for looking into this. >>>>>>> >>>>>>> I see the "Jar run handler" as function that takes few parameters and >>>>>>> returns a job ID. I think it would be nice if the handler doesn't >>>> hard >>>>>> code >>>>>>> the function. Perhaps this could be accomplished by pushing the code >>>>> into >>>>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that >>>>> the >>>>>>> main method could also bypass if it has an alternative way to provide >>>>> the >>>>>>> jobId via a context variable? >>>>>>> >>>>>>> Zili: I looked at the client API proposal and left a few comments. I >>>>>> think >>>>>>> it is important to improve programmatic job submission. But it also >>>>> seems >>>>>>> orthogonal to how the jar run handler operates (i.e. these issues >>>> could >>>>>> be >>>>>>> addressed independently). >>>>>>> >>>>>>> Chesnay: You are right that the Beam job sever could be hacked to >>>>> extract >>>>>>> job graph and other ingredients. This isn't desirable though because >>>>>> these >>>>>>> Flink internals should not be exposed downstream. But even if we went >>>>>> down >>>>>>> that route we would still need a way to let the jar run handler know >>>> to >>>>>>> just return the ID of an already submitted job vs. trying to submit >>>> one >>>>>>> from OptimizerPlanEnvironment. >>>>>>> >>>>>>> The intended sequence would be: >>>>>>> >>>>>>> REST client provides a launcher jar >>>>>>> REST client "runs jar" >>>>>>> REST handler calls main() >>>>>>> main launches Beam job server, runs Beam pipeline construction code >>>>>> against >>>>>>> that job server >>>>>>> job server uses RemoteEnvironment to submit real job >>>>>>> main "returns job id" >>>>>>> REST handler returns job id >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <wander4...@gmail.com> >>>>> wrote: >>>>>>> >>>>>>>> By the way, currently Dispatcher implements RestfulGateway >>>>>>>> and delegate resource request to ResourceManager. If we can, >>>>>>>> semantically, let WebMonitor implement RestfulGateway, >>>>>>>> and delegate job request to Dispatcher, resource request to >>>>>>>> ResourceManager, it seems reasonable that when WebMonitor >>>>>>>> receives a JarRun request, it spawns a process and run >>>>>>>> the main method of the main class of that jar. >>>>>>>> >>>>>>>> Best, >>>>>>>> tison. >>>>>>>> >>>>>>>> >>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 下午7:10写道: >>>>>>>> >>>>>>>>> I don't think the `Program` interface could solve the problem. >>>>>>>>> >>>>>>>>> The launcher launches the job server which creates the job graph, >>>>>>>>> submits it and keeps monitoring. Even if user program implement >>>>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and >>>>>>>>> submits it, instead of really execute codes in main method of >>>>>>>>> user program, so that the launcher is not started. >>>>>>>>> >>>>>>>>> @Thomas, >>>>>>>>> >>>>>>>>> Here is an ongoing discussion on client refactoring[1] as Till >>>>>>>>> mentioned. However, I'm afraid that with current jar run semantic, >>>>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it >>>>> cannot >>>>>>>>> fits your requirement. The problem is that REST API directly >>>>>>>>> communicates with Dispatcher and thus it's strange to tell the >>>>>>>>> Dispatcher "just run a program in a process". >>>>>>>>> >>>>>>>>> As you mentioned in the document, with CLI in session mode the >>>>>>>>> whole program would be executed sequentially. I'll appreciate it >>>>>>>>> if you can participant the thread on client refactor[1]. In the >>>>>>>>> design document[2], we propose to provide rich interfaces for >>>>>>>>> downstream projects integration. You can customize your CLI for >>>>>>>>> executing your program arbitrarily. Any requirement or advise >>>>>>>>> would be help. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> tison. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >>>>>>>>> [2] >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2019年7月31日周三 下午4:50写道: >>>>>>>>> >>>>>>>>>> Are you looking for something similar to the `Program` interface? >>>>>> This >>>>>>>>>> interface, even though it is a bit outdated and might get removed >>>>> in >>>>>>> the >>>>>>>>>> future, offers a `getPlan` method which is called in order to >>>>>> generate >>>>>>>>>> the >>>>>>>>>> `JobGraph`. In the client refactoring discussion thread it is >>>>>> currently >>>>>>>>>> being discussed what to do with this interface. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler < >>>>>> ches...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Couldn't the beam job server use the same work-around we're >>>> using >>>>>> in >>>>>>>>>> the >>>>>>>>>>> JarRunHandler to get access to the JobGraph? >>>>>>>>>>> >>>>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote: >>>>>>>>>>>> Hi Till, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for taking a look! >>>>>>>>>>>> >>>>>>>>>>>> The Beam job server does not currently have the ability to >>>> just >>>>>>>>>> output >>>>>>>>>>> the >>>>>>>>>>>> job graph (and related artifacts) that could then be used >>>> with >>>>>> the >>>>>>>>>>>> JobSubmitHandler. It is itself using >>>>> StreamExecutionEnvironment, >>>>>>>>>> which in >>>>>>>>>>>> turn will lead to a REST API submission. >>>>>>>>>>>> >>>>>>>>>>>> Here I'm looking at what happens before the Beam job server >>>>> gets >>>>>>>>>>> involved: >>>>>>>>>>>> the interaction of the k8s operator with the Flink >>>> deployment. >>>>>> The >>>>>>>>>> jar >>>>>>>>>>> run >>>>>>>>>>>> endpoint (ignoring the current handler implementation) is >>>>> generic >>>>>>> and >>>>>>>>>>>> pretty much exactly matches what we would need for a uniform >>>>>> entry >>>>>>>>>> point. >>>>>>>>>>>> It's just that in the Beam case the jar file would itself be >>>> a >>>>>>>>>> "launcher" >>>>>>>>>>>> that doesn't provide the job graph itself, but the >>>> dependencies >>>>>> and >>>>>>>>>>>> mechanism to invoke the actual client. >>>>>>>>>>>> >>>>>>>>>>>> I could accomplish what I'm looking for by creating a >>>> separate >>>>>> REST >>>>>>>>>>>> endpoint that looks almost the same. But I would prefer to >>>>> reuse >>>>>>> the >>>>>>>>>>> Flink >>>>>>>>>>>> REST API interaction that is already implemented for the >>>> Flink >>>>>> Java >>>>>>>>>> jobs >>>>>>>>>>> to >>>>>>>>>>>> reduce the complexity of the deployment. >>>>>>>>>>>> >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann < >>>>>>> trohrm...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>> >>>>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If >>>>>> another >>>>>>>>>>> process >>>>>>>>>>>>> is building the JobGraph, then one could use the >>>>>> JobSubmitHandler >>>>>>>>>> which >>>>>>>>>>>>> expects a JobGraph and then starts executing it. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Till >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise < >>>> t...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> While considering different options to launch Beam jobs >>>>> through >>>>>>> the >>>>>>>>>>> Flink >>>>>>>>>>>>>> REST API, I noticed that the implementation of >>>> JarRunHandler >>>>>>> places >>>>>>>>>>>>> quite a >>>>>>>>>>>>>> few restrictions on how the entry point shall construct a >>>>> Flink >>>>>>>>>> job, by >>>>>>>>>>>>>> extracting and manipulating the job graph. >>>>>>>>>>>>>> >>>>>>>>>>>>>> That's normally not a problem for Flink Java programs, but >>>> in >>>>>> the >>>>>>>>>>>>> scenario >>>>>>>>>>>>>> I'm looking at, the job graph would be constructed by a >>>>>> different >>>>>>>>>>> process >>>>>>>>>>>>>> and isn't available to the REST handler. Instead, I would >>>>> like >>>>>> to >>>>>>>>>> be >>>>>>>>>>> able >>>>>>>>>>>>>> to just respond with the job ID of the already launched >>>> job. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For context, please see: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d >>>>>>>>>>>>>> The current JarRunHandler code is here: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82 >>>>>>>>>>>>>> It would be nice if there was an option to delegate the >>>>>>>>>> responsibility >>>>>>>>>>>>> for >>>>>>>>>>>>>> job submission to the user code / entry point. That would >>>> be >>>>>>>>>> useful for >>>>>>>>>>>>>> Beam and other frameworks built on top of Flink that >>>>>> dynamically >>>>>>>>>>> create a >>>>>>>>>>>>>> job graph from a different representation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Possible ways to get there: >>>>>>>>>>>>>> >>>>>>>>>>>>>> * an interface that the main class can be implement end >>>> when >>>>>>>>>> present, >>>>>>>>>>> the >>>>>>>>>>>>>> jar run handler calls instead of main. >>>>>>>>>>>>>> >>>>>>>>>>>>>> * an annotated method >>>>>>>>>>>>>> >>>>>>>>>>>>>> Either way query parameters like savepoint path and >>>>> parallelism >>>>>>>>>> would >>>>>>>>>>> be >>>>>>>>>>>>>> forwarded to the user code and the result would be the ID >>>> of >>>>>> the >>>>>>>>>>> launched >>>>>>>>>>>>>> job. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thougths? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >>