I think that the ExecutorListener idea could work. With a bit more than FLIP-85, it is true that we can get rid of the "exception throwing" environments and we need to introduce an "EmbeddedExecutor" which is going to run on the JM. So, the 2 above, coupled with an ExecutorListener can have the desired effect.
Cheers, Kostas On Fri, Mar 13, 2020 at 11:37 AM Stephan Ewen <se...@apache.org> wrote: > > Few thoughts on the discussion: > > ## Changes on the Master > > If possible, let's avoid changes to the master (JobManager / Dispatcher). > These components are complex, we should strive to keep anything out of them > that we can keep out of them. > > ## Problems in different deployments (applications / sessions) > > This should be pretty straightforward after FLIP-84, correct? There should > be no more "exception throwing" environments that sneak the job graph out > of the main method. > > ## Proposal: Executor Listeners > > We could think of a mix between the two approaches: Executor Listerners. > When an executor is invoked with the Pipeline, the listener is also > notified. That would keep this out of the API and be properly within the > SPI layer. > The listeners could be loaded from config, or via service loaders. > > > On Fri, Mar 13, 2020 at 8:59 AM tison <wander4...@gmail.com> wrote: > > > Hi Gyula and all, > > > > Thanks for the discussion so far. > > > > It seems that the requirement is to deliver some metadata of the submitted > > job, > > and such metadata can be simply extracted from StreamGraph. > > > > I'm unfamiliar with metadata Atlas needs so I make some assumptions. > > > > Assumption: > > Metadata needed by Atlas is actually some Flink scope information, such as > > input/ > > output of a node. And this metadata is compile time information so that we > > know it > > during compiling the StreamGraph. > > > > If the assumption stands, I'm curious whether or not it is an option we > > standardize > > the JSON representation of StreamGraph which will contain metadata > > required. And > > pass the JSON representation from generation phase to JobGraph and then > > ExecutionGraph and finally retrievable from RestServer(so that we can > > extend > > JobClient to retrieve the plan by querying the cluster instead of have a > > pre-configured > > one). > > > > It is like `jsonPlan` in ExecutionGraph now(which is exposed by JobPlan > > REST > > endpoint). And I believe rather than JobGraph dump which is a physical > > plan, > > exposing access to StreamGraph dump which is a logical plan is possibly > > more > > interested from user perspective. > > > > Best, > > tison. > > > > > > Gyula Fóra <gyula.f...@gmail.com> 于2020年3月13日周五 下午3:20写道: > > > > > Thanks again Kostas for diving deep into this, it is great feedback! > > > > > > I agree with the concerns regarding the custom executor, it has to be > > able > > > to properly handle the "original" executor somehow. > > > This might be quite tricky if we want to implement the AtlasExecutor > > > outside Flink. In any case does not really feel clean or lightweight at > > > first glance. > > > > > > As for the JobClient/JobListener/Pipeline question, as you brought up the > > > possibility for later attaching the JobClient, maybe the best route for > > > this would be to > > > add the Pipeline as a method parameter in the JobListener. It would break > > > code compatibility but at least would have a consistent behavior. > > > > > > Now to the big problem of not having executors / joblisteners work in > > > kuberentes-per-job, web, etc modes. I was not aware of this problem > > until > > > now, this also seems to affect the whole concept of the JobListener > > > interface. What good is a JobListener if it only listens to certain kind > > of > > > deployments :) > > > > > > Incidentally, in my first proposal (and prototype) I had the atlashook > > > running on the JobMaster with an extra addition to a JobGraphGenerator > > hook > > > that could be registered in the StreamExecutionEnvironment. This meant > > that > > > we could work on the StreamGraph, register metadata in the JobGraph, and > > > execute the actual atlas registration logic in the JobMaster when the job > > > starts. > > > > > > Looking back this is a much more complex, and uglier, logic than having a > > > simple JobListener. But it would at least work in all possible job > > > submission scenarios, as long as the JobGraph was generated through the > > > StreamGraph logic (which should be always). > > > > > > Cheers, > > > Gyula > > > > > > > > > On Thu, Mar 12, 2020 at 8:53 PM Kostas Kloudas <kklou...@gmail.com> > > wrote: > > > > > > > Thanks Gyula, > > > > > > > > Looking forward to your comments. > > > > Just to let you know, I would not like having a method that in some > > > > cases works as expected and in some other ones it does not. It would > > > > be nice if we could expose consistent behaviour to the users. > > > > > > > > On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gyula.f...@gmail.com> > > wrote: > > > > > > > > > > Thanks Kostas, I have to review the possible limitations with the > > > > Executor > > > > > before I can properly answer. > > > > > > > > > > Regarding you comments for the listener pattern, we proposed in the > > > > > document to include the getPipeline() in the JobClient itself as you > > > > > suggested to fit the pattern :) For not always being able to return > > the > > > > > pipeline, this might be expected depending on how the JobClient, so > > we > > > > need > > > > > to handle it some way. > > > > > > > > > > > > > > > On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kklou...@gmail.com> > > > > wrote: > > > > > > > > > > > Hi again, > > > > > > > > > > > > Just to clarify, I am not against exposing the Pipeline if this > > will > > > > > > lead to a "clean" solution. > > > > > > And, I. forgot to say that the last solution, if adopted, would > > have > > > > > > to work on the JobGraph, which may not be that desirable. > > > > > > > > > > > > Kostas > > > > > > > > > > > > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kklou...@gmail.com > > > > > > > wrote: > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I do not have a strong opinion on the topic yet, but I would like > > > to > > > > > > > share my thoughts on this. > > > > > > > > > > > > > > In the solution proposing a wrapping AtlasExecutor around the > > Flink > > > > > > > Executors, if we allow the user to use the CLI to submit jobs, > > then > > > > > > > this means that the CLI code may have to change so that it > > injects > > > > the > > > > > > > executor option to AtlasExecutor (transparently to the user), and > > > > then > > > > > > > the AtlasExecutor should take what the user has actually set as > > > > > > > pipeline executor and find the adequate executor. If this is not > > > done > > > > > > > transparently, then the user should do sth explicit to point > > Flink > > > to > > > > > > > Atlas and then to the correct executor, which implies that we > > > should > > > > > > > add user-facing stuff (like cli options) to Flink. > > > > > > > > > > > > > > For the solution of adding getPipeline() to the JobListener, I > > > think > > > > > > > that from a design perspective, it does not fit in the listener > > > > > > > itself. The listener is a "passive" entity that is expected to > > > listen > > > > > > > to specific "events". Adding a getter does not fit there. Other > > > > > > > options for the getPipeline() method are: > > > > > > > 1) add it as a method to the JobClient > > > > > > > 2) add it as an argument to the methods of the JobListener (along > > > > with > > > > > > > the JobClient and the throwable) > > > > > > > > > > > > > > Alternative 1) would currently work because the JobClient is only > > > > > > > instantiated by the executor. But in the future, we may (and > > > probably > > > > > > > will because of implications of FLIP-85) allow a JobClient to get > > > > > > > "attached" to a running job. In this case, the getPipeline() will > > > not > > > > > > > have a pipeline to return. > > > > > > > Alternative 2) will break existing code, which I am not sure how > > > > > > > important this is as the JobListener is a new feature and I guess > > > > some > > > > > > > but not many users. > > > > > > > > > > > > > > As a sidenote, if I am not mistaken, apart from Yarn, none of the > > > > > > > above solutions would work in per-job mode for Kuberneter, Mesos > > or > > > > > > > with web-submissions. These modes go through "special" execution > > > > > > > environments that use them simply to extract the JobGraph which > > > then > > > > > > > they submit to the cluster. In this case, there is no executor > > > > > > > involved. Are these cases important to you? > > > > > > > > > > > > > > Finally, another solution, although more drastic and more > > involved, > > > > > > > could be to have a "JobListener" running on the jobMaster. This > > > will > > > > > > > collect the relevant info and send them to Atlas. But I am not > > sure > > > > > > > how Atlas works and if it requires the data to be extracted on > > the > > > > > > > client side. I am saying this because the JobMasters may be > > running > > > > > > > anywhere in a cluster while the clients may run on designated > > > > machines > > > > > > > which can have specific configurations, e.g. open ports to > > > > communicate > > > > > > > with a specific Atlas server. > > > > > > > > > > > > > > Cheers, > > > > > > > Kostas > > > > > > > > > > > > > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org> > > > > wrote: > > > > > > > > > > > > > > > > Hi Gyula! > > > > > > > > > > > > > > > > My main motivation was to try and avoid mixing an internal > > > > interface > > > > > > > > (Pipeline) with public API. It looks like this is trying to go > > > > "public > > > > > > > > stable", but doesn't really do it exactly because of mixing > > > > "pipeline" > > > > > > into > > > > > > > > this. > > > > > > > > You would need to cast "Pipeline" and work on internal classes > > in > > > > the > > > > > > > > implementation. > > > > > > > > > > > > > > > > If we use an "internal API" or a "semi-stable SPI" class, it > > > looks > > > > at a > > > > > > > > first glance a bit cleaner and more maintainable (opening up > > less > > > > > > surface) > > > > > > > > to make the PipelineExecutor a "stable SPI". > > > > > > > > I have not checked out all the details, though. > > > > > > > > > > > > > > > > Best, > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra < > > gyula.f...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Stephan! > > > > > > > > > > > > > > > > > > Thanks for checking this out. I agree that wrapping the other > > > > > > > > > PipelineExecutors with a delegating AtlasExecutor would be a > > > good > > > > > > > > > alternative approach to implement this but I actually feel > > that > > > > it > > > > > > suffers > > > > > > > > > even more problems than exposing the Pipeline instance in the > > > > > > JobListener. > > > > > > > > > > > > > > > > > > The main idea with the Atlas integration would be to have the > > > > Atlas > > > > > > hook > > > > > > > > > logic in the Atlas project where it would be maintained. This > > > > means > > > > > > that > > > > > > > > > any approach we take has to rely on public APIs. The > > > JobListener > > > > is > > > > > > already > > > > > > > > > a public evolving API while the PipelineExecutor and the > > > factory > > > > is > > > > > > purely > > > > > > > > > internal. Even if we make it public it will still expose the > > > > > > Pipeline so we > > > > > > > > > did not gain much on the public/internal API front. > > > > > > > > > > > > > > > > > > I also feel that since the Atlas hook logic should only > > observe > > > > the > > > > > > > > > pipeline and collect information the JobListener interface > > > seems > > > > an > > > > > > ideal > > > > > > > > > match and the implementation can be pretty lightweight. From > > a > > > > purely > > > > > > > > > implementation perspective adding an Executor would be more > > > > heavy as > > > > > > it has > > > > > > > > > to properly delegate to an other executor making sure that we > > > > don't > > > > > > break > > > > > > > > > anything. > > > > > > > > > > > > > > > > > > Don't take me wrong, I am not opposed to reworking the > > > > > > implementations we > > > > > > > > > have as it's very simple at this point but I also want to > > make > > > > sure > > > > > > that we > > > > > > > > > take the approach that is simple from a maintainability > > > > standpoint. > > > > > > Of > > > > > > > > > course my argument rests on the assumption that the AtlasHook > > > > itself > > > > > > will > > > > > > > > > live outside of the Flink project, thats another question. > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen < > > > se...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi all! > > > > > > > > > > > > > > > > > > > > In general, nice idea to support this integration with > > Atlas. > > > > > > > > > > > > > > > > > > > > I think we could make this a bit easier/lightweight with a > > > > small > > > > > > change. > > > > > > > > > > One of the issues that is not super nice is that this > > starts > > > > > > exposing the > > > > > > > > > > (currently empty) Pipeline interface in the public API. > > > > > > > > > > The Pipeline is an SPI interface that would be good to hide > > > in > > > > the > > > > > > API. > > > > > > > > > > > > > > > > > > > > Since 1.10, Flink has the notion of Executors, which take > > the > > > > > > pipeline > > > > > > > > > and > > > > > > > > > > execute them. Meaning each pipeline is passed on anyways. > > And > > > > > > executors > > > > > > > > > are > > > > > > > > > > already configurable in the Flink configuration. > > > > > > > > > > So, instead of passing the pipeline both "down" (to the > > > > executor) > > > > > > and "to > > > > > > > > > > the side" (JobListener), could we just have a wrapping > > > > > > "AtlasExecutor" > > > > > > > > > that > > > > > > > > > > takes the pipeline, does whatever it wants, and then passes > > > it > > > > to > > > > > > the > > > > > > > > > > proper executor? This would also have the advantage that it > > > > > > supports > > > > > > > > > making > > > > > > > > > > changes to the pipeline, if needed in the future. For > > > example, > > > > if > > > > > > there > > > > > > > > > is > > > > > > > > > > ever the need to add additional configuration fields, set > > > > > > properties, add > > > > > > > > > > "labels" or so, this could be easily done in the suggested > > > > > > approach. > > > > > > > > > > > > > > > > > > > > I tried to sketch this in the picture below, pardon my bad > > > > drawing. > > > > > > > > > > > > > > > > > > > > [image: Listener_Executor.png] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek < > > > > > > aljos...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> Thanks! I'm reading the document now and will get back to > > > you. > > > > > > > > > >> > > > > > > > > > >> Best, > > > > > > > > > >> Aljoscha > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >