I think that the ExecutorListener idea could work.

With a bit more than FLIP-85, it is true that we can get rid of the
"exception throwing" environments and we need to introduce an
"EmbeddedExecutor" which is going to run on the JM. So, the 2 above,
coupled with an ExecutorListener can have the desired effect.

Cheers,
Kostas

On Fri, Mar 13, 2020 at 11:37 AM Stephan Ewen <se...@apache.org> wrote:
>
> Few thoughts on the discussion:
>
> ## Changes on the Master
>
> If possible, let's avoid changes to the master (JobManager / Dispatcher).
> These components are complex, we should strive to keep anything out of them
> that we can keep out of them.
>
> ## Problems in different deployments (applications / sessions)
>
> This should be pretty straightforward after FLIP-84, correct? There should
> be no more "exception throwing" environments that sneak the job graph out
> of the main method.
>
> ## Proposal: Executor Listeners
>
> We could think of a mix between the two approaches: Executor Listerners.
> When an executor is invoked with the Pipeline, the listener is also
> notified. That would keep this out of the API and be properly within the
> SPI layer.
> The listeners could be loaded from config, or via service loaders.
>
>
> On Fri, Mar 13, 2020 at 8:59 AM tison <wander4...@gmail.com> wrote:
>
> > Hi Gyula and all,
> >
> > Thanks for the discussion so far.
> >
> > It seems that the requirement is to deliver some metadata of the submitted
> > job,
> > and such metadata can be simply extracted from StreamGraph.
> >
> > I'm unfamiliar with metadata Atlas needs so I make some assumptions.
> >
> > Assumption:
> > Metadata needed by Atlas is actually some Flink scope information, such as
> > input/
> > output of a node. And this metadata is compile time information so that we
> > know it
> > during compiling the StreamGraph.
> >
> > If the assumption stands, I'm curious whether or not it is an option we
> > standardize
> > the JSON representation of StreamGraph which will contain metadata
> > required. And
> > pass the JSON representation from generation phase to JobGraph and then
> > ExecutionGraph and finally retrievable from RestServer(so that we can
> > extend
> > JobClient to retrieve the plan by querying the cluster instead of have a
> > pre-configured
> > one).
> >
> > It is like `jsonPlan` in ExecutionGraph now(which is exposed by JobPlan
> > REST
> > endpoint). And I believe rather than JobGraph dump which is a physical
> > plan,
> > exposing access to StreamGraph dump which is a logical plan is possibly
> > more
> > interested from user perspective.
> >
> > Best,
> > tison.
> >
> >
> > Gyula Fóra <gyula.f...@gmail.com> 于2020年3月13日周五 下午3:20写道:
> >
> > > Thanks again Kostas for diving deep into this, it is great feedback!
> > >
> > > I agree with the concerns regarding the custom executor, it has to be
> > able
> > > to properly handle the "original" executor somehow.
> > > This might be quite tricky if we want to implement the AtlasExecutor
> > > outside Flink. In any case does not really feel clean or lightweight at
> > > first glance.
> > >
> > > As for the JobClient/JobListener/Pipeline question, as you brought up the
> > > possibility for later attaching the JobClient, maybe the best route for
> > > this would be to
> > > add the Pipeline as a method parameter in the JobListener. It would break
> > > code compatibility but at least would have a consistent behavior.
> > >
> > > Now to the big problem of not having executors / joblisteners work in
> > > kuberentes-per-job,  web, etc modes. I was not aware of this problem
> > until
> > > now, this also seems to affect the whole concept of the JobListener
> > > interface. What good is a JobListener if it only listens to certain kind
> > of
> > > deployments :)
> > >
> > > Incidentally, in my first proposal (and prototype) I had the atlashook
> > > running on the JobMaster with an extra addition to a JobGraphGenerator
> > hook
> > > that could be registered in the StreamExecutionEnvironment. This meant
> > that
> > > we could work on the StreamGraph, register metadata in the JobGraph, and
> > > execute the actual atlas registration logic in the JobMaster when the job
> > > starts.
> > >
> > > Looking back this is a much more complex, and uglier, logic than having a
> > > simple JobListener. But it would at least work in all possible job
> > > submission scenarios, as long as the JobGraph was generated through the
> > > StreamGraph logic (which should be always).
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> > > On Thu, Mar 12, 2020 at 8:53 PM Kostas Kloudas <kklou...@gmail.com>
> > wrote:
> > >
> > > > Thanks Gyula,
> > > >
> > > > Looking forward to your comments.
> > > > Just to let you know, I would not like having a method that in some
> > > > cases works as expected and in some other ones it does not. It would
> > > > be nice if we could expose consistent behaviour to the users.
> > > >
> > > > On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gyula.f...@gmail.com>
> > wrote:
> > > > >
> > > > > Thanks Kostas, I have to review the possible limitations with the
> > > > Executor
> > > > > before I can properly answer.
> > > > >
> > > > > Regarding you comments for the listener pattern, we proposed in the
> > > > > document to include the getPipeline() in the JobClient itself as you
> > > > > suggested to fit the pattern :) For not always being able to return
> > the
> > > > > pipeline, this might be expected depending on how the JobClient, so
> > we
> > > > need
> > > > > to handle it some way.
> > > > >
> > > > >
> > > > > On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kklou...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi again,
> > > > > >
> > > > > > Just to clarify, I am not against exposing the Pipeline if this
> > will
> > > > > > lead to a "clean" solution.
> > > > > > And, I. forgot to say that the last solution, if adopted, would
> > have
> > > > > > to work on the JobGraph, which may not be that desirable.
> > > > > >
> > > > > > Kostas
> > > > > >
> > > > > > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kklou...@gmail.com
> > >
> > > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I do not have a strong opinion on the topic yet, but I would like
> > > to
> > > > > > > share my thoughts on this.
> > > > > > >
> > > > > > > In the solution proposing a wrapping AtlasExecutor around the
> > Flink
> > > > > > > Executors, if we allow the user to use the CLI to submit jobs,
> > then
> > > > > > > this means that the CLI code may have to change so that it
> > injects
> > > > the
> > > > > > > executor option to AtlasExecutor (transparently to the user), and
> > > > then
> > > > > > > the AtlasExecutor should take what the user has actually set as
> > > > > > > pipeline executor and find the adequate executor. If this is not
> > > done
> > > > > > > transparently, then the user should do sth explicit to point
> > Flink
> > > to
> > > > > > > Atlas and then to the correct executor, which implies that we
> > > should
> > > > > > > add user-facing stuff (like cli options) to Flink.
> > > > > > >
> > > > > > > For the solution of adding getPipeline() to the JobListener, I
> > > think
> > > > > > > that from a design perspective, it does not fit in the listener
> > > > > > > itself. The listener is a "passive" entity that is expected to
> > > listen
> > > > > > > to specific "events". Adding a getter does not fit there. Other
> > > > > > > options for the getPipeline() method are:
> > > > > > > 1) add it as a method to the JobClient
> > > > > > > 2) add it as an argument to the methods of the JobListener (along
> > > > with
> > > > > > > the JobClient and the throwable)
> > > > > > >
> > > > > > > Alternative 1) would currently work because the JobClient is only
> > > > > > > instantiated by the executor. But in the future, we may (and
> > > probably
> > > > > > > will because of implications of FLIP-85) allow a JobClient to get
> > > > > > > "attached" to a running job. In this case, the getPipeline() will
> > > not
> > > > > > > have a pipeline to return.
> > > > > > > Alternative 2) will break existing code, which I am not sure how
> > > > > > > important this is as the JobListener is a new feature and I guess
> > > > some
> > > > > > > but not many users.
> > > > > > >
> > > > > > > As a sidenote, if I am not mistaken, apart from Yarn, none of the
> > > > > > > above solutions would work in per-job mode for Kuberneter, Mesos
> > or
> > > > > > > with web-submissions. These modes go through "special" execution
> > > > > > > environments that use them simply to extract the JobGraph which
> > > then
> > > > > > > they submit to the cluster. In this case, there is no executor
> > > > > > > involved. Are these cases important to you?
> > > > > > >
> > > > > > > Finally, another solution, although more drastic and more
> > involved,
> > > > > > > could be to have a "JobListener" running on the jobMaster. This
> > > will
> > > > > > > collect the relevant info and send them to Atlas. But I am not
> > sure
> > > > > > > how Atlas works and if it requires the data to be extracted on
> > the
> > > > > > > client side. I am saying this because the JobMasters may be
> > running
> > > > > > > anywhere in a cluster while the clients may run on designated
> > > > machines
> > > > > > > which can have specific configurations, e.g. open ports to
> > > > communicate
> > > > > > > with a specific Atlas server.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Kostas
> > > > > > >
> > > > > > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org>
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi Gyula!
> > > > > > > >
> > > > > > > > My main motivation was to try and avoid mixing an internal
> > > > interface
> > > > > > > > (Pipeline) with public API. It looks like this is trying to go
> > > > "public
> > > > > > > > stable", but doesn't really do it exactly because of mixing
> > > > "pipeline"
> > > > > > into
> > > > > > > > this.
> > > > > > > > You would need to cast "Pipeline" and work on internal classes
> > in
> > > > the
> > > > > > > > implementation.
> > > > > > > >
> > > > > > > > If we use an "internal API" or a "semi-stable SPI" class, it
> > > looks
> > > > at a
> > > > > > > > first glance a bit cleaner and more maintainable (opening up
> > less
> > > > > > surface)
> > > > > > > > to make the PipelineExecutor a "stable SPI".
> > > > > > > > I have not checked out all the details, though.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Stephan
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <
> > gyula.f...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Stephan!
> > > > > > > > >
> > > > > > > > > Thanks for checking this out. I agree that wrapping the other
> > > > > > > > > PipelineExecutors with a delegating AtlasExecutor would be a
> > > good
> > > > > > > > > alternative approach to implement this but I actually feel
> > that
> > > > it
> > > > > > suffers
> > > > > > > > > even more problems than exposing the Pipeline instance in the
> > > > > > JobListener.
> > > > > > > > >
> > > > > > > > > The main idea with the Atlas integration would be to have the
> > > > Atlas
> > > > > > hook
> > > > > > > > > logic in the Atlas project where it would be maintained. This
> > > > means
> > > > > > that
> > > > > > > > > any approach we take has to rely on public APIs. The
> > > JobListener
> > > > is
> > > > > > already
> > > > > > > > > a public evolving API while the PipelineExecutor and the
> > > factory
> > > > is
> > > > > > purely
> > > > > > > > > internal. Even if we make it public it will still expose the
> > > > > > Pipeline so we
> > > > > > > > > did not gain much on the public/internal API front.
> > > > > > > > >
> > > > > > > > > I also feel that since the Atlas hook logic should only
> > observe
> > > > the
> > > > > > > > > pipeline and collect information the JobListener interface
> > > seems
> > > > an
> > > > > > ideal
> > > > > > > > > match and the implementation can be pretty lightweight. From
> > a
> > > > purely
> > > > > > > > > implementation perspective adding an Executor would be more
> > > > heavy as
> > > > > > it has
> > > > > > > > > to properly delegate to an other executor making sure that we
> > > > don't
> > > > > > break
> > > > > > > > > anything.
> > > > > > > > >
> > > > > > > > > Don't take me wrong, I am not opposed to reworking the
> > > > > > implementations we
> > > > > > > > > have as it's very simple at this point but I also want to
> > make
> > > > sure
> > > > > > that we
> > > > > > > > > take the approach that is simple from a maintainability
> > > > standpoint.
> > > > > > Of
> > > > > > > > > course my argument rests on the assumption that the AtlasHook
> > > > itself
> > > > > > will
> > > > > > > > > live outside of the Flink project, thats another question.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen <
> > > se...@apache.org>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all!
> > > > > > > > > >
> > > > > > > > > > In general, nice idea to support this integration with
> > Atlas.
> > > > > > > > > >
> > > > > > > > > > I think we could make this a bit easier/lightweight with a
> > > > small
> > > > > > change.
> > > > > > > > > > One of the issues that is not super nice is that this
> > starts
> > > > > > exposing the
> > > > > > > > > > (currently empty) Pipeline interface in the public API.
> > > > > > > > > > The Pipeline is an SPI interface that would be good to hide
> > > in
> > > > the
> > > > > > API.
> > > > > > > > > >
> > > > > > > > > > Since 1.10, Flink has the notion of Executors, which take
> > the
> > > > > > pipeline
> > > > > > > > > and
> > > > > > > > > > execute them. Meaning each pipeline is passed on anyways.
> > And
> > > > > > executors
> > > > > > > > > are
> > > > > > > > > > already configurable in the Flink configuration.
> > > > > > > > > > So, instead of passing the pipeline both "down" (to the
> > > > executor)
> > > > > > and "to
> > > > > > > > > > the side" (JobListener), could we just have a wrapping
> > > > > > "AtlasExecutor"
> > > > > > > > > that
> > > > > > > > > > takes the pipeline, does whatever it wants, and then passes
> > > it
> > > > to
> > > > > > the
> > > > > > > > > > proper executor? This would also have the advantage that it
> > > > > > supports
> > > > > > > > > making
> > > > > > > > > > changes to the pipeline, if needed in the future. For
> > > example,
> > > > if
> > > > > > there
> > > > > > > > > is
> > > > > > > > > > ever the need to add additional configuration fields, set
> > > > > > properties, add
> > > > > > > > > > "labels" or so, this could be easily done in the suggested
> > > > > > approach.
> > > > > > > > > >
> > > > > > > > > > I tried to sketch this in the picture below, pardon my bad
> > > > drawing.
> > > > > > > > > >
> > > > > > > > > > [image: Listener_Executor.png]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > >
> > >
> > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Stephan
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek <
> > > > > > aljos...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Thanks! I'm reading the document now and will get back to
> > > you.
> > > > > > > > > >>
> > > > > > > > > >> Best,
> > > > > > > > > >> Aljoscha
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > >
> > >
> >

Reply via email to