Hi Gyula, Flink 1.10 introduced JobListener which is invoked after job submission and finished. May we can add api on JobClient to get what info you needed for altas integration.
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46 Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道: > Hi all! > > We have started some preliminary work on the Flink - Atlas integration at > Cloudera. It seems that the integration will require some new hook > interfaces at the jobgraph generation and submission phases, so I figured I > will open a discussion thread with my initial ideas to get some early > feedback. > > *Minimal background* > Very simply put Apache Atlas is a data governance framework that stores > metadata for our data and processing logic to track ownership, lineage etc. > It is already integrated with systems like HDFS, Kafka, Hive and many > others. > > Adding Flink integration would mean that we can track the input output data > of our Flink jobs, their owners and how different Flink jobs are connected > to each other through the data they produce (lineage). This seems to be a > very big deal for a lot of companies :) > > *Flink - Atlas integration in a nutshell* > In order to integrate with Atlas we basically need 2 things. > - Flink entity definitions > - Flink Atlas hook > > The entity definition is the easy part. It is a json that contains the > objects (entities) that we want to store for any give Flink job. As a > starter we could have a single FlinkApplication entity that has a set of > inputs and outputs. These inputs/outputs are other Atlas entities that are > already defines such as Kafka topic or Hbase table. > > The Flink atlas hook will be the logic that creates the entity instance and > uploads it to Atlas when we start a new Flink job. This is the part where > we implement the core logic. > > *Job submission hook* > In order to implement the Atlas hook we need a place where we can inspect > the pipeline, create and send the metadata when the job starts. When we > create the FlinkApplication entity we need to be able to easily determine > the sources and sinks (and their properties) of the pipeline. > > Unfortunately there is no JobSubmission hook in Flink that could execute > this logic and even if there was one there is a mismatch of abstraction > levels needed to implement the integration. > We could imagine a JobSubmission hook executed in the JobManager runner as > this: > > void onSuccessfulSubmission(JobGraph jobGraph, Configuration > configuration); > > This is nice but the JobGraph makes it super difficult to extract sources > and UDFs to create the metadata entity. The atlas entity however could be > easily created from the StreamGraph object (used to represent the logical > flow) before the JobGraph is generated. To go around this limitation we > could add a JobGraphGeneratorHook interface: > > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph > jobGraph); > > We could then generate the atlas entity in the preprocess step and add a > jobmission hook in the postprocess step that will simply send the already > baked in entity. > > *This kinda works but...* > The approach outlined above seems to work and we have built a POC using it. > Unfortunately it is far from nice as it exposes non-public APIs such as the > StreamGraph. Also it feels a bit weird to have 2 hooks instead of one. > > It would be much nicer if we could somehow go back from JobGraph to > StreamGraph or at least have an easy way to access source/sink UDFS. > > What do you think? > > Cheers, > Gyula > -- Best Regards Jeff Zhang