Hi all! Thank you for the patience!
We have created a small design document for the change proposal detailing the minimal required changes in Flink for the initial version of the Atlas integration. You can find the document here: https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing It would be great if you could check it out and comment on it. If we agree on the next steps I will start opening JIRA-s and PRs with the proposed changes. The document links to an already working Atlas hook prototype (and accompanying flink fork). The links for that are also here: Flink: https://github.com/gyfora/flink/tree/atlas-changes Atlas: https://github.com/gyfora/atlas/tree/flink-bridge Thank you! Gyula On Thu, Feb 13, 2020 at 4:43 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Thanks for the feedback Aljoscha! > > I have a POC ready with the Flink changes + the Atlas hook implementation. > I will try to push this to a public repo tomorrow and we can discuss > further based on that! > > Gyula > > On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <aljos...@apache.org> wrote: > >> I think exposing the Pipeline should be ok. Using the internal >> StreamGraph might be problematic because this might change/break but >> that's a problem of the external code. >> >> Aljoscha >> >> On 11.02.20 16:26, Gyula Fóra wrote: >> > Hi All! >> > >> > I have made a prototype that simply adds a getPipeline() method to the >> > JobClient interface. Then I could easily implement the Atlas hook using >> the >> > JobListener interface. I simply check if Pipeline is instanceof >> StreamGraph >> > and do the logic there. >> > >> > I think this is so far the cleanest approach and I much prefer this >> > compared to working on the JobGraph directly which would expose even >> more >> > messy internals. >> > >> > Unfortunately this change alone is not enough for the integration as we >> > need to make sure that all Sources/Sinks that we want to integrate to >> atlas >> > publicly expose some of their properties: >> > >> > - Kafka source/sink: >> > - Kafka props >> > - Topic(s) - this is tricky for sinks >> > - FS source /sink: >> > - Hadoop props >> > - Base path for StreamingFileSink >> > - Path for ContinuousMonitoringSource >> > >> > Most of these are straightforward changes, the only question is what we >> > want to register in Atlas from the available connectors. Ideally users >> > could also somehow register their own Atlas metadata for custom sources >> and >> > sinks, we could probably introduce an interface for that in Atlas. >> > >> > Cheers, >> > Gyula >> > >> > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gyula.f...@gmail.com> >> wrote: >> > >> >> Maybe we could improve the Pipeline interface in the long run, but as a >> >> temporary solution the JobClient could expose a getPipeline() method. >> >> >> >> That way the implementation of the JobListener could check if its a >> >> StreamGraph or a Plan. >> >> >> >> How bad does that sound? >> >> >> >> Gyula >> >> >> >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com> >> wrote: >> >> >> >>> Hi Aljoscha! >> >>> >> >>> That's a valid concert but we should try to figure something out, many >> >>> users need this before they can use Flink. >> >>> >> >>> I think the closest thing we have right now is the StreamGraph. In >> >>> contrast with the JobGraph the StreamGraph is pretty nice from a >> metadata >> >>> perspective :D >> >>> The big downside of exposing the StreamGraph is that we don't have it >> in >> >>> batch. On the other hand we could expose the JobGraph but then the >> >>> integration component would still have to do the heavy lifting for >> batch >> >>> and stream specific operators and UDFs. >> >>> >> >>> Instead of exposing either StreamGraph/JobGraph, we could come up >> with a >> >>> metadata like representation for the users but that would be like >> >>> implementing Atlas integration itself without Atlas dependencies :D >> >>> >> >>> As a comparison point, this is how it works in Storm: >> >>> Every operator (spout/bolt), stores a config map (string->string) with >> >>> all the metadata such as operator class, and the operator specific >> configs. >> >>> The Atlas hook works on this map. >> >>> This is very fragile and depends on a lot of internals. Kind of like >> >>> exposing the JobGraph but much worse. I think we can do better. >> >>> >> >>> Gyula >> >>> >> >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <aljos...@apache.org> >> >>> wrote: >> >>> >> >>>> If we need it, we can probably beef up the JobListener to allow >> >>>> accessing some information about the whole graph or sources and >> sinks. >> >>>> My only concern right now is that we don't have a stable interface >> for >> >>>> our job graphs/pipelines right now. >> >>>> >> >>>> Best, >> >>>> Aljoscha >> >>>> >> >>>> On 06.02.20 23:00, Gyula Fóra wrote: >> >>>>> Hi Jeff & Till! >> >>>>> >> >>>>> Thanks for the feedback, this is exactly the discussion I was >> looking >> >>>> for. >> >>>>> The JobListener looks very promising if we can expose the JobGraph >> >>>> somehow >> >>>>> (correct me if I am wrong but it is not accessible at the moment). >> >>>>> >> >>>>> I did not know about this feature that's why I added my >> JobSubmission >> >>>> hook >> >>>>> which was pretty similar but only exposing the JobGraph. In general >> I >> >>>> like >> >>>>> the listener better and I would not like to add anything extra if we >> >>>> can >> >>>>> avoid it. >> >>>>> >> >>>>> Actually the bigger part of the integration work that will need more >> >>>>> changes in Flink will be regarding the accessibility of >> sources/sinks >> >>>> from >> >>>>> the JobGraph and their specific properties. For instance at the >> moment >> >>>> the >> >>>>> Kafka sources and sinks do not expose anything publicly such as >> topics, >> >>>>> kafka configs, etc. Same goes for other data connectors that we >> need to >> >>>>> integrate in the long run. I guess there will be a separate thread >> on >> >>>> this >> >>>>> once we iron out the initial integration points :) >> >>>>> >> >>>>> I will try to play around with the JobListener interface tomorrow >> and >> >>>> see >> >>>>> if I can extend it to meet our needs. >> >>>>> >> >>>>> Cheers, >> >>>>> Gyula >> >>>>> >> >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> wrote: >> >>>>> >> >>>>>> Hi Gyula, >> >>>>>> >> >>>>>> Flink 1.10 introduced JobListener which is invoked after job >> >>>> submission and >> >>>>>> finished. May we can add api on JobClient to get what info you >> >>>> needed for >> >>>>>> altas integration. >> >>>>>> >> >>>>>> >> >>>>>> >> >>>> >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46 >> >>>>>> >> >>>>>> >> >>>>>> Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道: >> >>>>>> >> >>>>>>> Hi all! >> >>>>>>> >> >>>>>>> We have started some preliminary work on the Flink - Atlas >> >>>> integration at >> >>>>>>> Cloudera. It seems that the integration will require some new hook >> >>>>>>> interfaces at the jobgraph generation and submission phases, so I >> >>>>>> figured I >> >>>>>>> will open a discussion thread with my initial ideas to get some >> early >> >>>>>>> feedback. >> >>>>>>> >> >>>>>>> *Minimal background* >> >>>>>>> Very simply put Apache Atlas is a data governance framework that >> >>>> stores >> >>>>>>> metadata for our data and processing logic to track ownership, >> >>>> lineage >> >>>>>> etc. >> >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and >> many >> >>>>>>> others. >> >>>>>>> >> >>>>>>> Adding Flink integration would mean that we can track the input >> >>>> output >> >>>>>> data >> >>>>>>> of our Flink jobs, their owners and how different Flink jobs are >> >>>>>> connected >> >>>>>>> to each other through the data they produce (lineage). This seems >> to >> >>>> be a >> >>>>>>> very big deal for a lot of companies :) >> >>>>>>> >> >>>>>>> *Flink - Atlas integration in a nutshell* >> >>>>>>> In order to integrate with Atlas we basically need 2 things. >> >>>>>>> - Flink entity definitions >> >>>>>>> - Flink Atlas hook >> >>>>>>> >> >>>>>>> The entity definition is the easy part. It is a json that contains >> >>>> the >> >>>>>>> objects (entities) that we want to store for any give Flink job. >> As a >> >>>>>>> starter we could have a single FlinkApplication entity that has a >> >>>> set of >> >>>>>>> inputs and outputs. These inputs/outputs are other Atlas entities >> >>>> that >> >>>>>> are >> >>>>>>> already defines such as Kafka topic or Hbase table. >> >>>>>>> >> >>>>>>> The Flink atlas hook will be the logic that creates the entity >> >>>> instance >> >>>>>> and >> >>>>>>> uploads it to Atlas when we start a new Flink job. This is the >> part >> >>>> where >> >>>>>>> we implement the core logic. >> >>>>>>> >> >>>>>>> *Job submission hook* >> >>>>>>> In order to implement the Atlas hook we need a place where we can >> >>>> inspect >> >>>>>>> the pipeline, create and send the metadata when the job starts. >> When >> >>>> we >> >>>>>>> create the FlinkApplication entity we need to be able to easily >> >>>> determine >> >>>>>>> the sources and sinks (and their properties) of the pipeline. >> >>>>>>> >> >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could >> >>>> execute >> >>>>>>> this logic and even if there was one there is a mismatch of >> >>>> abstraction >> >>>>>>> levels needed to implement the integration. >> >>>>>>> We could imagine a JobSubmission hook executed in the JobManager >> >>>> runner >> >>>>>> as >> >>>>>>> this: >> >>>>>>> >> >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration >> >>>>>>> configuration); >> >>>>>>> >> >>>>>>> This is nice but the JobGraph makes it super difficult to extract >> >>>> sources >> >>>>>>> and UDFs to create the metadata entity. The atlas entity however >> >>>> could be >> >>>>>>> easily created from the StreamGraph object (used to represent the >> >>>> logical >> >>>>>>> flow) before the JobGraph is generated. To go around this >> limitation >> >>>> we >> >>>>>>> could add a JobGraphGeneratorHook interface: >> >>>>>>> >> >>>>>>> void preProcess(StreamGraph streamGraph); void >> postProcess(JobGraph >> >>>>>>> jobGraph); >> >>>>>>> >> >>>>>>> We could then generate the atlas entity in the preprocess step and >> >>>> add a >> >>>>>>> jobmission hook in the postprocess step that will simply send the >> >>>> already >> >>>>>>> baked in entity. >> >>>>>>> >> >>>>>>> *This kinda works but...* >> >>>>>>> The approach outlined above seems to work and we have built a POC >> >>>> using >> >>>>>> it. >> >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs >> such >> >>>> as >> >>>>>> the >> >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of >> >>>> one. >> >>>>>>> >> >>>>>>> It would be much nicer if we could somehow go back from JobGraph >> to >> >>>>>>> StreamGraph or at least have an easy way to access source/sink >> UDFS. >> >>>>>>> >> >>>>>>> What do you think? >> >>>>>>> >> >>>>>>> Cheers, >> >>>>>>> Gyula >> >>>>>>> >> >>>>>> >> >>>>>> >> >>>>>> -- >> >>>>>> Best Regards >> >>>>>> >> >>>>>> Jeff Zhang >> >>>>>> >> >>>>> >> >>>> >> >>> >> > >> >