Hi Thomas,

Ok, I agree with the necessity of running a job without a Java Client. FLIP-73 
aims at introducing a Pipeline (or FlinkPipeline) interface that is the common 
interface of StreamGraph and Plan. Maybe we could reintroduce something like 
Program that returns a Pipeline for this purpose.

Best,
Aljoscha

> On 26. Sep 2019, at 19:30, Thomas Weise <t...@apache.org> wrote:
> 
> Hi Aljoscha,
> 
> Thanks for taking a look!
> 
> Multiple options to approach the submission part for the Beam use case are
> discussed in [1].
> 
> I'm actually now working on a different approach that creates a Flink jar
> at build time.
> 
> To the point of whether UserProgram.main() should be called in JobManager
> or not: I believe it is important to provide the user an option to submit a
> job without running a Java client. Today that necessitates that the Java
> entry point will be called on the JM.
> 
> The executor related work as such won't change that. However, it would be
> nice to have a separate mechanism that allows the user to specify an entry
> point that produces the FlinkPipeline/plan, w/o having to "execute" through
> the kind of hacky context environment.
> 
> Thanks,
> Thomas
> 
> [1]
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> 
> 
> 
> 
> On Thu, Sep 26, 2019 at 7:15 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
> 
>> Hi,
>> 
>> Regarding the original proposal: I don’t think spawning another process
>> inside the JarHandler.runJar() is the way to go here. Looking at the bigger
>> picture, the proposal would get us to roughly this situation:
>> 
>> 1. Spawn Kubernetes containers (JobManager and TaskManagers)
>> 2. User does a REST call to JobManager.runJar() to submit the user job
>> 3. JobManager.runJar() opens a port that waits for job submission
>> 4. JobMananger.runJar() invokes UserProgram.main()
>> 5. UserProgram.main() launches a process (BeamJobService) that opens a
>> port to wait for a Python process to connect to it
>> 6. UserProgram.main() launches another process (the Python code, or any
>> language, really) that connects to BeamJobService to submit the Pipeline
>> 7. BeamJobService receives the Pipeline and talks to the port open on
>> JobManager (via REST service, maybe) to submit the Job
>> 8. Job is executed
>> 9. Where is UserProgram.main() at this point?
>> 
>> I think that even running UserProgram.main() in the JobManager is already
>> too much. A JobManager should accept JobGraphs (or something) and execute
>> them, nothing more. Running UserProgram.main() makes some things
>> complicated or weird. For example, what happens when that
>> UserProgram.main() creates a RemoteEnvironment and uses that? What happens
>> when the user code calls execute() multiple times.
>> 
>> I think a good solution for the motivating use case is to
>> 
>> a) run BeamJobService as a separate service that talks to a running
>> JobManager via REST for submitting jobs that it receives
>> 
>> b) Spawning a JobManager inside the BeamJobService, i.e. the
>> BeamJobService is like the entry point in a per-job Kubernetes model.
>> Something that the new Executor work ([1], [2]) will enable.
>> 
>> Any thoughts? I’m happy to jump on a call about this because these things
>> are very tricky to figure out and I might be wrong.
>> 
>> Best,
>> Aljoscha
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
>> [2]
>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631
>> 
>>> On 6. Aug 2019, at 09:51, Till Rohrmann <trohrm...@apache.org> wrote:
>>> 
>>> I think there was the idea to make the JobGraph a "public"/stable
>> interface
>>> other projects can rely on at some point. If I remember correctly, then
>> we
>>> wanted to define a proto buf definition for the JobGraph so that clients
>>> written in a different language can submit JobGraphs and we could extend
>>> the data structure. As far as I know, this effort hasn't been started yet
>>> and is still in the backlog (I think there doesn't exist a JIRA issue
>> yet).
>>> 
>>> The problem came up when discussing additions to the JobGraph because
>> they
>>> need to be backwards compatible otherwise newer version of Flink would
>> not
>>> be able to recover jobs. I think so far Flink provides backwards
>>> compatibility between different versions of the JobGraph. However, this
>> is
>>> not officially guaranteed.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <wander4...@gmail.com> wrote:
>>> 
>>>> It sounds like a request to change the interface Program into
>>>> 
>>>> public interface Program {
>>>> JobGraph getJobGraph(String... args);
>>>> }
>>>> 
>>>> Also, given that JobGraph is said as internal interface or
>>>> cannot be relied on, we might introduce and use a
>>>> representation that allows for cross version compatibility.
>>>> 
>>>> 
>>>> Thomas Weise <t...@apache.org> 于2019年8月6日周二 上午12:11写道:
>>>> 
>>>>> If the goal is to keep job creation and job submission separate and we
>>>>> agree that there should be more flexibility for the job construction,
>>>> then
>>>>> JobGraph and friends should be stable API that the user can depend on.
>> If
>>>>> that's the case, the path Chesnay pointed to may become viable.
>>>>> 
>>>>> There was discussion in the past that JobGraph cannot be relied on WRT
>>>>> backward compatibility and I would expect that at some point we want to
>>>>> move to a representation that allows for cross version compatibility.
>>>> Beam
>>>>> is an example how this could be accomplished (with its pipeline proto).
>>>>> 
>>>>> So if the Beam job server was able to produce the JobGraph, is there
>>>>> agreement that we should provide a mechanism that allows the program
>>>> entry
>>>>> point to return the JobGraph directly (without using the
>>>>> ExecutionEnvironment to build it)?
>>>>> 
>>>>> 
>>>>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <wander4...@gmail.com> wrote:
>>>>> 
>>>>>> Hi Thomas,
>>>>>> 
>>>>>> If REST handler calls main(), the behavior inside main() is
>>>>>> unpredictable.
>>>>>> 
>>>>>> Now the jar run handler extract the job graph and submit
>>>>>> it with the job id configured in REST request. If REST
>>>>>> handler calls main() we can hardly even know how much
>>>>>> jobs are executed.
>>>>>> 
>>>>>> A new environment, as you said,
>>>>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
>>>>>> added to satisfy your requirement. However, it is a bit
>>>>>> out of Flink scope. It might be better to write your own
>>>>>> REST handler.
>>>>>> 
>>>>>> WebMonitorExtension is for extending REST handlers but
>>>>>> it seems also unable to customize...
>>>>>> 
>>>>>> Best,
>>>>>> tison.
>>>>>> 
>>>>>> 
>>>>>> Thomas Weise <t...@apache.org> 于2019年8月3日周六 上午4:09写道:
>>>>>> 
>>>>>>> Thanks for looking into this.
>>>>>>> 
>>>>>>> I see the "Jar run handler" as function that takes few parameters and
>>>>>>> returns a job ID. I think it would be nice if the handler doesn't
>>>> hard
>>>>>> code
>>>>>>> the function. Perhaps this could be accomplished by pushing the code
>>>>> into
>>>>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that
>>>>> the
>>>>>>> main method could also bypass if it has an alternative way to provide
>>>>> the
>>>>>>> jobId via a context variable?
>>>>>>> 
>>>>>>> Zili: I looked at the client API proposal and left a few comments. I
>>>>>> think
>>>>>>> it is important to improve programmatic job submission. But it also
>>>>> seems
>>>>>>> orthogonal to how the jar run handler operates (i.e. these issues
>>>> could
>>>>>> be
>>>>>>> addressed independently).
>>>>>>> 
>>>>>>> Chesnay: You are right that the Beam job sever could be hacked to
>>>>> extract
>>>>>>> job graph and other ingredients. This isn't desirable though because
>>>>>> these
>>>>>>> Flink internals should not be exposed downstream. But even if we went
>>>>>> down
>>>>>>> that route we would still need a way to let the jar run handler know
>>>> to
>>>>>>> just return the ID of an already submitted job vs. trying to submit
>>>> one
>>>>>>> from OptimizerPlanEnvironment.
>>>>>>> 
>>>>>>> The intended sequence would be:
>>>>>>> 
>>>>>>> REST client provides a launcher jar
>>>>>>> REST client "runs jar"
>>>>>>> REST handler calls main()
>>>>>>> main launches Beam job server, runs Beam pipeline construction code
>>>>>> against
>>>>>>> that job server
>>>>>>> job server uses RemoteEnvironment to submit real job
>>>>>>> main "returns job id"
>>>>>>> REST handler returns job id
>>>>>>> 
>>>>>>> Thomas
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <wander4...@gmail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> By the way, currently Dispatcher implements RestfulGateway
>>>>>>>> and delegate resource request to ResourceManager. If we can,
>>>>>>>> semantically, let WebMonitor implement RestfulGateway,
>>>>>>>> and delegate job request to Dispatcher, resource request to
>>>>>>>> ResourceManager, it seems reasonable that when WebMonitor
>>>>>>>> receives a JarRun request, it spawns a process and run
>>>>>>>> the main method of the main class of that jar.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> tison.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 下午7:10写道:
>>>>>>>> 
>>>>>>>>> I don't think the `Program` interface could solve the problem.
>>>>>>>>> 
>>>>>>>>> The launcher launches the job server which creates the job graph,
>>>>>>>>> submits it and keeps monitoring. Even if user program implement
>>>>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and
>>>>>>>>> submits it, instead of really execute codes in main method of
>>>>>>>>> user program, so that the launcher is not started.
>>>>>>>>> 
>>>>>>>>> @Thomas,
>>>>>>>>> 
>>>>>>>>> Here is an ongoing discussion on client refactoring[1] as Till
>>>>>>>>> mentioned. However, I'm afraid that with current jar run semantic,
>>>>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it
>>>>> cannot
>>>>>>>>> fits your requirement. The problem is that REST API directly
>>>>>>>>> communicates with Dispatcher and thus it's strange to tell the
>>>>>>>>> Dispatcher "just run a program in a process".
>>>>>>>>> 
>>>>>>>>> As you mentioned in the document, with CLI in session mode the
>>>>>>>>> whole program would be executed sequentially. I'll appreciate it
>>>>>>>>> if you can participant the thread on client refactor[1]. In the
>>>>>>>>> design document[2], we propose to provide rich interfaces for
>>>>>>>>> downstream projects integration. You can customize your CLI for
>>>>>>>>> executing your program arbitrarily. Any requirement or advise
>>>>>>>>> would be help.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> tison.
>>>>>>>>> 
>>>>>>>>> [1]
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>>>> [2]
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2019年7月31日周三 下午4:50写道:
>>>>>>>>> 
>>>>>>>>>> Are you looking for something similar to the `Program` interface?
>>>>>> This
>>>>>>>>>> interface, even though it is a bit outdated and might get removed
>>>>> in
>>>>>>> the
>>>>>>>>>> future, offers a `getPlan` method which is called in order to
>>>>>> generate
>>>>>>>>>> the
>>>>>>>>>> `JobGraph`. In the client refactoring discussion thread it is
>>>>>> currently
>>>>>>>>>> being discussed what to do with this interface.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>> 
>>>>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
>>>>>> ches...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Couldn't the beam job server use the same work-around we're
>>>> using
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>> JarRunHandler to get access to the JobGraph?
>>>>>>>>>>> 
>>>>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote:
>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for taking a look!
>>>>>>>>>>>> 
>>>>>>>>>>>> The Beam job server does not currently have the ability to
>>>> just
>>>>>>>>>> output
>>>>>>>>>>> the
>>>>>>>>>>>> job graph (and related artifacts) that could then be used
>>>> with
>>>>>> the
>>>>>>>>>>>> JobSubmitHandler. It is itself using
>>>>> StreamExecutionEnvironment,
>>>>>>>>>> which in
>>>>>>>>>>>> turn will lead to a REST API submission.
>>>>>>>>>>>> 
>>>>>>>>>>>> Here I'm looking at what happens before the Beam job server
>>>>> gets
>>>>>>>>>>> involved:
>>>>>>>>>>>> the interaction of the k8s operator with the Flink
>>>> deployment.
>>>>>> The
>>>>>>>>>> jar
>>>>>>>>>>> run
>>>>>>>>>>>> endpoint (ignoring the current handler implementation) is
>>>>> generic
>>>>>>> and
>>>>>>>>>>>> pretty much exactly matches what we would need for a uniform
>>>>>> entry
>>>>>>>>>> point.
>>>>>>>>>>>> It's just that in the Beam case the jar file would itself be
>>>> a
>>>>>>>>>> "launcher"
>>>>>>>>>>>> that doesn't provide the job graph itself, but the
>>>> dependencies
>>>>>> and
>>>>>>>>>>>> mechanism to invoke the actual client.
>>>>>>>>>>>> 
>>>>>>>>>>>> I could accomplish what I'm looking for by creating a
>>>> separate
>>>>>> REST
>>>>>>>>>>>> endpoint that looks almost the same. But I would prefer to
>>>>> reuse
>>>>>>> the
>>>>>>>>>>> Flink
>>>>>>>>>>>> REST API interaction that is already implemented for the
>>>> Flink
>>>>>> Java
>>>>>>>>>> jobs
>>>>>>>>>>> to
>>>>>>>>>>>> reduce the complexity of the deployment.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thomas
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
>>>>>>> trohrm...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If
>>>>>> another
>>>>>>>>>>> process
>>>>>>>>>>>>> is building the JobGraph, then one could use the
>>>>>> JobSubmitHandler
>>>>>>>>>> which
>>>>>>>>>>>>> expects a JobGraph and then starts executing it.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
>>>> t...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> While considering different options to launch Beam jobs
>>>>> through
>>>>>>> the
>>>>>>>>>>> Flink
>>>>>>>>>>>>>> REST API, I noticed that the implementation of
>>>> JarRunHandler
>>>>>>> places
>>>>>>>>>>>>> quite a
>>>>>>>>>>>>>> few restrictions on how the entry point shall construct a
>>>>> Flink
>>>>>>>>>> job, by
>>>>>>>>>>>>>> extracting and manipulating the job graph.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> That's normally not a problem for Flink Java programs, but
>>>> in
>>>>>> the
>>>>>>>>>>>>> scenario
>>>>>>>>>>>>>> I'm looking at, the job graph would be constructed by a
>>>>>> different
>>>>>>>>>>> process
>>>>>>>>>>>>>> and isn't available to the REST handler. Instead, I would
>>>>> like
>>>>>> to
>>>>>>>>>> be
>>>>>>>>>>> able
>>>>>>>>>>>>>> to just respond with the job ID of the already launched
>>>> job.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> For context, please see:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>>>>>>>>>>>>>> The current JarRunHandler code is here:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>>>>>>>>>>>>>> It would be nice if there was an option to delegate the
>>>>>>>>>> responsibility
>>>>>>>>>>>>> for
>>>>>>>>>>>>>> job submission to the user code / entry point. That would
>>>> be
>>>>>>>>>> useful for
>>>>>>>>>>>>>> Beam and other frameworks built on top of Flink that
>>>>>> dynamically
>>>>>>>>>>> create a
>>>>>>>>>>>>>> job graph from a different representation.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Possible ways to get there:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> * an interface that the main class can be implement end
>>>> when
>>>>>>>>>> present,
>>>>>>>>>>> the
>>>>>>>>>>>>>> jar run handler calls instead of main.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> * an annotated method
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Either way query parameters like savepoint path and
>>>>> parallelism
>>>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>>>> forwarded to the user code and the result would be the ID
>>>> of
>>>>>> the
>>>>>>>>>>> launched
>>>>>>>>>>>>>> job.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thougths?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 

Reply via email to