Hi Gyula and all, Thanks for the discussion so far.
It seems that the requirement is to deliver some metadata of the submitted job, and such metadata can be simply extracted from StreamGraph. I'm unfamiliar with metadata Atlas needs so I make some assumptions. Assumption: Metadata needed by Atlas is actually some Flink scope information, such as input/ output of a node. And this metadata is compile time information so that we know it during compiling the StreamGraph. If the assumption stands, I'm curious whether or not it is an option we standardize the JSON representation of StreamGraph which will contain metadata required. And pass the JSON representation from generation phase to JobGraph and then ExecutionGraph and finally retrievable from RestServer(so that we can extend JobClient to retrieve the plan by querying the cluster instead of have a pre-configured one). It is like `jsonPlan` in ExecutionGraph now(which is exposed by JobPlan REST endpoint). And I believe rather than JobGraph dump which is a physical plan, exposing access to StreamGraph dump which is a logical plan is possibly more interested from user perspective. Best, tison. Gyula Fóra <gyula.f...@gmail.com> 于2020年3月13日周五 下午3:20写道: > Thanks again Kostas for diving deep into this, it is great feedback! > > I agree with the concerns regarding the custom executor, it has to be able > to properly handle the "original" executor somehow. > This might be quite tricky if we want to implement the AtlasExecutor > outside Flink. In any case does not really feel clean or lightweight at > first glance. > > As for the JobClient/JobListener/Pipeline question, as you brought up the > possibility for later attaching the JobClient, maybe the best route for > this would be to > add the Pipeline as a method parameter in the JobListener. It would break > code compatibility but at least would have a consistent behavior. > > Now to the big problem of not having executors / joblisteners work in > kuberentes-per-job, web, etc modes. I was not aware of this problem until > now, this also seems to affect the whole concept of the JobListener > interface. What good is a JobListener if it only listens to certain kind of > deployments :) > > Incidentally, in my first proposal (and prototype) I had the atlashook > running on the JobMaster with an extra addition to a JobGraphGenerator hook > that could be registered in the StreamExecutionEnvironment. This meant that > we could work on the StreamGraph, register metadata in the JobGraph, and > execute the actual atlas registration logic in the JobMaster when the job > starts. > > Looking back this is a much more complex, and uglier, logic than having a > simple JobListener. But it would at least work in all possible job > submission scenarios, as long as the JobGraph was generated through the > StreamGraph logic (which should be always). > > Cheers, > Gyula > > > On Thu, Mar 12, 2020 at 8:53 PM Kostas Kloudas <kklou...@gmail.com> wrote: > > > Thanks Gyula, > > > > Looking forward to your comments. > > Just to let you know, I would not like having a method that in some > > cases works as expected and in some other ones it does not. It would > > be nice if we could expose consistent behaviour to the users. > > > > On Thu, Mar 12, 2020 at 8:44 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > > Thanks Kostas, I have to review the possible limitations with the > > Executor > > > before I can properly answer. > > > > > > Regarding you comments for the listener pattern, we proposed in the > > > document to include the getPipeline() in the JobClient itself as you > > > suggested to fit the pattern :) For not always being able to return the > > > pipeline, this might be expected depending on how the JobClient, so we > > need > > > to handle it some way. > > > > > > > > > On Thu, Mar 12, 2020 at 8:30 PM Kostas Kloudas <kklou...@gmail.com> > > wrote: > > > > > > > Hi again, > > > > > > > > Just to clarify, I am not against exposing the Pipeline if this will > > > > lead to a "clean" solution. > > > > And, I. forgot to say that the last solution, if adopted, would have > > > > to work on the JobGraph, which may not be that desirable. > > > > > > > > Kostas > > > > > > > > On Thu, Mar 12, 2020 at 8:26 PM Kostas Kloudas <kklou...@gmail.com> > > wrote: > > > > > > > > > > Hi all, > > > > > > > > > > I do not have a strong opinion on the topic yet, but I would like > to > > > > > share my thoughts on this. > > > > > > > > > > In the solution proposing a wrapping AtlasExecutor around the Flink > > > > > Executors, if we allow the user to use the CLI to submit jobs, then > > > > > this means that the CLI code may have to change so that it injects > > the > > > > > executor option to AtlasExecutor (transparently to the user), and > > then > > > > > the AtlasExecutor should take what the user has actually set as > > > > > pipeline executor and find the adequate executor. If this is not > done > > > > > transparently, then the user should do sth explicit to point Flink > to > > > > > Atlas and then to the correct executor, which implies that we > should > > > > > add user-facing stuff (like cli options) to Flink. > > > > > > > > > > For the solution of adding getPipeline() to the JobListener, I > think > > > > > that from a design perspective, it does not fit in the listener > > > > > itself. The listener is a "passive" entity that is expected to > listen > > > > > to specific "events". Adding a getter does not fit there. Other > > > > > options for the getPipeline() method are: > > > > > 1) add it as a method to the JobClient > > > > > 2) add it as an argument to the methods of the JobListener (along > > with > > > > > the JobClient and the throwable) > > > > > > > > > > Alternative 1) would currently work because the JobClient is only > > > > > instantiated by the executor. But in the future, we may (and > probably > > > > > will because of implications of FLIP-85) allow a JobClient to get > > > > > "attached" to a running job. In this case, the getPipeline() will > not > > > > > have a pipeline to return. > > > > > Alternative 2) will break existing code, which I am not sure how > > > > > important this is as the JobListener is a new feature and I guess > > some > > > > > but not many users. > > > > > > > > > > As a sidenote, if I am not mistaken, apart from Yarn, none of the > > > > > above solutions would work in per-job mode for Kuberneter, Mesos or > > > > > with web-submissions. These modes go through "special" execution > > > > > environments that use them simply to extract the JobGraph which > then > > > > > they submit to the cluster. In this case, there is no executor > > > > > involved. Are these cases important to you? > > > > > > > > > > Finally, another solution, although more drastic and more involved, > > > > > could be to have a "JobListener" running on the jobMaster. This > will > > > > > collect the relevant info and send them to Atlas. But I am not sure > > > > > how Atlas works and if it requires the data to be extracted on the > > > > > client side. I am saying this because the JobMasters may be running > > > > > anywhere in a cluster while the clients may run on designated > > machines > > > > > which can have specific configurations, e.g. open ports to > > communicate > > > > > with a specific Atlas server. > > > > > > > > > > Cheers, > > > > > Kostas > > > > > > > > > > On Thu, Mar 12, 2020 at 3:19 PM Stephan Ewen <se...@apache.org> > > wrote: > > > > > > > > > > > > Hi Gyula! > > > > > > > > > > > > My main motivation was to try and avoid mixing an internal > > interface > > > > > > (Pipeline) with public API. It looks like this is trying to go > > "public > > > > > > stable", but doesn't really do it exactly because of mixing > > "pipeline" > > > > into > > > > > > this. > > > > > > You would need to cast "Pipeline" and work on internal classes in > > the > > > > > > implementation. > > > > > > > > > > > > If we use an "internal API" or a "semi-stable SPI" class, it > looks > > at a > > > > > > first glance a bit cleaner and more maintainable (opening up less > > > > surface) > > > > > > to make the PipelineExecutor a "stable SPI". > > > > > > I have not checked out all the details, though. > > > > > > > > > > > > Best, > > > > > > Stephan > > > > > > > > > > > > > > > > > > On Thu, Mar 12, 2020 at 2:47 PM Gyula Fóra <gyula.f...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Hi Stephan! > > > > > > > > > > > > > > Thanks for checking this out. I agree that wrapping the other > > > > > > > PipelineExecutors with a delegating AtlasExecutor would be a > good > > > > > > > alternative approach to implement this but I actually feel that > > it > > > > suffers > > > > > > > even more problems than exposing the Pipeline instance in the > > > > JobListener. > > > > > > > > > > > > > > The main idea with the Atlas integration would be to have the > > Atlas > > > > hook > > > > > > > logic in the Atlas project where it would be maintained. This > > means > > > > that > > > > > > > any approach we take has to rely on public APIs. The > JobListener > > is > > > > already > > > > > > > a public evolving API while the PipelineExecutor and the > factory > > is > > > > purely > > > > > > > internal. Even if we make it public it will still expose the > > > > Pipeline so we > > > > > > > did not gain much on the public/internal API front. > > > > > > > > > > > > > > I also feel that since the Atlas hook logic should only observe > > the > > > > > > > pipeline and collect information the JobListener interface > seems > > an > > > > ideal > > > > > > > match and the implementation can be pretty lightweight. From a > > purely > > > > > > > implementation perspective adding an Executor would be more > > heavy as > > > > it has > > > > > > > to properly delegate to an other executor making sure that we > > don't > > > > break > > > > > > > anything. > > > > > > > > > > > > > > Don't take me wrong, I am not opposed to reworking the > > > > implementations we > > > > > > > have as it's very simple at this point but I also want to make > > sure > > > > that we > > > > > > > take the approach that is simple from a maintainability > > standpoint. > > > > Of > > > > > > > course my argument rests on the assumption that the AtlasHook > > itself > > > > will > > > > > > > live outside of the Flink project, thats another question. > > > > > > > > > > > > > > Cheers, > > > > > > > Gyula > > > > > > > > > > > > > > On Thu, Mar 12, 2020 at 11:34 AM Stephan Ewen < > se...@apache.org> > > > > wrote: > > > > > > > > > > > > > > > Hi all! > > > > > > > > > > > > > > > > In general, nice idea to support this integration with Atlas. > > > > > > > > > > > > > > > > I think we could make this a bit easier/lightweight with a > > small > > > > change. > > > > > > > > One of the issues that is not super nice is that this starts > > > > exposing the > > > > > > > > (currently empty) Pipeline interface in the public API. > > > > > > > > The Pipeline is an SPI interface that would be good to hide > in > > the > > > > API. > > > > > > > > > > > > > > > > Since 1.10, Flink has the notion of Executors, which take the > > > > pipeline > > > > > > > and > > > > > > > > execute them. Meaning each pipeline is passed on anyways. And > > > > executors > > > > > > > are > > > > > > > > already configurable in the Flink configuration. > > > > > > > > So, instead of passing the pipeline both "down" (to the > > executor) > > > > and "to > > > > > > > > the side" (JobListener), could we just have a wrapping > > > > "AtlasExecutor" > > > > > > > that > > > > > > > > takes the pipeline, does whatever it wants, and then passes > it > > to > > > > the > > > > > > > > proper executor? This would also have the advantage that it > > > > supports > > > > > > > making > > > > > > > > changes to the pipeline, if needed in the future. For > example, > > if > > > > there > > > > > > > is > > > > > > > > ever the need to add additional configuration fields, set > > > > properties, add > > > > > > > > "labels" or so, this could be easily done in the suggested > > > > approach. > > > > > > > > > > > > > > > > I tried to sketch this in the picture below, pardon my bad > > drawing. > > > > > > > > > > > > > > > > [image: Listener_Executor.png] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://xjcrkw.bn.files.1drv.com/y4pWH57aEvLU5Ww4REC9XLi7nJMLGHq2smPSzaslU8ogywFDcMkP-_Rsl8B1njf4qphodim6bgnLTNFwNoEuwFdTuA2Xmf7CJ_8lTJjrKlFlDwrugVeBQzEhAY7n_5j2bumwDBf29jn_tZ1ueZxe2slhLkPC-9K6Dry_vpvRvZRY-CSnQXxj9jDf7P53Vz1K9Ez/Listener_Executor.png?psid=1 > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 11, 2020 at 11:41 AM Aljoscha Krettek < > > > > aljos...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Thanks! I'm reading the document now and will get back to > you. > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> Aljoscha > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >