Hi all, We have added the interface for registering the connectors in custom user user defined functions, like representing enrichment from an HBase table in the middle of a Flink application. We are reaching out to the Atlas community to review the implementation in the near future too, based on which we plan to open a pull request to Flink to add the minor changes needed for the sources and sinks we plan to support out of the box as described in the design document. Once these changes are merged we can add the necessary functionality in Atlas too.
You can find the document here: https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing Best, Marton On Thu, Feb 20, 2020 at 10:38 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi all! > > Thank you for the patience! > > We have created a small design document for the change proposal detailing > the minimal required changes in Flink for the initial version of the Atlas > integration. > > You can find the document here: > > https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing > > It would be great if you could check it out and comment on it. > If we agree on the next steps I will start opening JIRA-s and PRs with the > proposed changes. > > The document links to an already working Atlas hook prototype (and > accompanying flink fork). The links for that are also here: > Flink: https://github.com/gyfora/flink/tree/atlas-changes > Atlas: https://github.com/gyfora/atlas/tree/flink-bridge > > Thank you! > Gyula > > On Thu, Feb 13, 2020 at 4:43 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Thanks for the feedback Aljoscha! > > > > I have a POC ready with the Flink changes + the Atlas hook > implementation. > > I will try to push this to a public repo tomorrow and we can discuss > > further based on that! > > > > Gyula > > > > On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <aljos...@apache.org> > wrote: > > > >> I think exposing the Pipeline should be ok. Using the internal > >> StreamGraph might be problematic because this might change/break but > >> that's a problem of the external code. > >> > >> Aljoscha > >> > >> On 11.02.20 16:26, Gyula Fóra wrote: > >> > Hi All! > >> > > >> > I have made a prototype that simply adds a getPipeline() method to the > >> > JobClient interface. Then I could easily implement the Atlas hook > using > >> the > >> > JobListener interface. I simply check if Pipeline is instanceof > >> StreamGraph > >> > and do the logic there. > >> > > >> > I think this is so far the cleanest approach and I much prefer this > >> > compared to working on the JobGraph directly which would expose even > >> more > >> > messy internals. > >> > > >> > Unfortunately this change alone is not enough for the integration as > we > >> > need to make sure that all Sources/Sinks that we want to integrate to > >> atlas > >> > publicly expose some of their properties: > >> > > >> > - Kafka source/sink: > >> > - Kafka props > >> > - Topic(s) - this is tricky for sinks > >> > - FS source /sink: > >> > - Hadoop props > >> > - Base path for StreamingFileSink > >> > - Path for ContinuousMonitoringSource > >> > > >> > Most of these are straightforward changes, the only question is what > we > >> > want to register in Atlas from the available connectors. Ideally users > >> > could also somehow register their own Atlas metadata for custom > sources > >> and > >> > sinks, we could probably introduce an interface for that in Atlas. > >> > > >> > Cheers, > >> > Gyula > >> > > >> > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gyula.f...@gmail.com> > >> wrote: > >> > > >> >> Maybe we could improve the Pipeline interface in the long run, but > as a > >> >> temporary solution the JobClient could expose a getPipeline() method. > >> >> > >> >> That way the implementation of the JobListener could check if its a > >> >> StreamGraph or a Plan. > >> >> > >> >> How bad does that sound? > >> >> > >> >> Gyula > >> >> > >> >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com> > >> wrote: > >> >> > >> >>> Hi Aljoscha! > >> >>> > >> >>> That's a valid concert but we should try to figure something out, > many > >> >>> users need this before they can use Flink. > >> >>> > >> >>> I think the closest thing we have right now is the StreamGraph. In > >> >>> contrast with the JobGraph the StreamGraph is pretty nice from a > >> metadata > >> >>> perspective :D > >> >>> The big downside of exposing the StreamGraph is that we don't have > it > >> in > >> >>> batch. On the other hand we could expose the JobGraph but then the > >> >>> integration component would still have to do the heavy lifting for > >> batch > >> >>> and stream specific operators and UDFs. > >> >>> > >> >>> Instead of exposing either StreamGraph/JobGraph, we could come up > >> with a > >> >>> metadata like representation for the users but that would be like > >> >>> implementing Atlas integration itself without Atlas dependencies :D > >> >>> > >> >>> As a comparison point, this is how it works in Storm: > >> >>> Every operator (spout/bolt), stores a config map (string->string) > with > >> >>> all the metadata such as operator class, and the operator specific > >> configs. > >> >>> The Atlas hook works on this map. > >> >>> This is very fragile and depends on a lot of internals. Kind of like > >> >>> exposing the JobGraph but much worse. I think we can do better. > >> >>> > >> >>> Gyula > >> >>> > >> >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek < > aljos...@apache.org> > >> >>> wrote: > >> >>> > >> >>>> If we need it, we can probably beef up the JobListener to allow > >> >>>> accessing some information about the whole graph or sources and > >> sinks. > >> >>>> My only concern right now is that we don't have a stable interface > >> for > >> >>>> our job graphs/pipelines right now. > >> >>>> > >> >>>> Best, > >> >>>> Aljoscha > >> >>>> > >> >>>> On 06.02.20 23:00, Gyula Fóra wrote: > >> >>>>> Hi Jeff & Till! > >> >>>>> > >> >>>>> Thanks for the feedback, this is exactly the discussion I was > >> looking > >> >>>> for. > >> >>>>> The JobListener looks very promising if we can expose the JobGraph > >> >>>> somehow > >> >>>>> (correct me if I am wrong but it is not accessible at the moment). > >> >>>>> > >> >>>>> I did not know about this feature that's why I added my > >> JobSubmission > >> >>>> hook > >> >>>>> which was pretty similar but only exposing the JobGraph. In > general > >> I > >> >>>> like > >> >>>>> the listener better and I would not like to add anything extra if > we > >> >>>> can > >> >>>>> avoid it. > >> >>>>> > >> >>>>> Actually the bigger part of the integration work that will need > more > >> >>>>> changes in Flink will be regarding the accessibility of > >> sources/sinks > >> >>>> from > >> >>>>> the JobGraph and their specific properties. For instance at the > >> moment > >> >>>> the > >> >>>>> Kafka sources and sinks do not expose anything publicly such as > >> topics, > >> >>>>> kafka configs, etc. Same goes for other data connectors that we > >> need to > >> >>>>> integrate in the long run. I guess there will be a separate thread > >> on > >> >>>> this > >> >>>>> once we iron out the initial integration points :) > >> >>>>> > >> >>>>> I will try to play around with the JobListener interface tomorrow > >> and > >> >>>> see > >> >>>>> if I can extend it to meet our needs. > >> >>>>> > >> >>>>> Cheers, > >> >>>>> Gyula > >> >>>>> > >> >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> > wrote: > >> >>>>> > >> >>>>>> Hi Gyula, > >> >>>>>> > >> >>>>>> Flink 1.10 introduced JobListener which is invoked after job > >> >>>> submission and > >> >>>>>> finished. May we can add api on JobClient to get what info you > >> >>>> needed for > >> >>>>>> altas integration. > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>> > >> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46 > >> >>>>>> > >> >>>>>> > >> >>>>>> Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道: > >> >>>>>> > >> >>>>>>> Hi all! > >> >>>>>>> > >> >>>>>>> We have started some preliminary work on the Flink - Atlas > >> >>>> integration at > >> >>>>>>> Cloudera. It seems that the integration will require some new > hook > >> >>>>>>> interfaces at the jobgraph generation and submission phases, so > I > >> >>>>>> figured I > >> >>>>>>> will open a discussion thread with my initial ideas to get some > >> early > >> >>>>>>> feedback. > >> >>>>>>> > >> >>>>>>> *Minimal background* > >> >>>>>>> Very simply put Apache Atlas is a data governance framework that > >> >>>> stores > >> >>>>>>> metadata for our data and processing logic to track ownership, > >> >>>> lineage > >> >>>>>> etc. > >> >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and > >> many > >> >>>>>>> others. > >> >>>>>>> > >> >>>>>>> Adding Flink integration would mean that we can track the input > >> >>>> output > >> >>>>>> data > >> >>>>>>> of our Flink jobs, their owners and how different Flink jobs are > >> >>>>>> connected > >> >>>>>>> to each other through the data they produce (lineage). This > seems > >> to > >> >>>> be a > >> >>>>>>> very big deal for a lot of companies :) > >> >>>>>>> > >> >>>>>>> *Flink - Atlas integration in a nutshell* > >> >>>>>>> In order to integrate with Atlas we basically need 2 things. > >> >>>>>>> - Flink entity definitions > >> >>>>>>> - Flink Atlas hook > >> >>>>>>> > >> >>>>>>> The entity definition is the easy part. It is a json that > contains > >> >>>> the > >> >>>>>>> objects (entities) that we want to store for any give Flink job. > >> As a > >> >>>>>>> starter we could have a single FlinkApplication entity that has > a > >> >>>> set of > >> >>>>>>> inputs and outputs. These inputs/outputs are other Atlas > entities > >> >>>> that > >> >>>>>> are > >> >>>>>>> already defines such as Kafka topic or Hbase table. > >> >>>>>>> > >> >>>>>>> The Flink atlas hook will be the logic that creates the entity > >> >>>> instance > >> >>>>>> and > >> >>>>>>> uploads it to Atlas when we start a new Flink job. This is the > >> part > >> >>>> where > >> >>>>>>> we implement the core logic. > >> >>>>>>> > >> >>>>>>> *Job submission hook* > >> >>>>>>> In order to implement the Atlas hook we need a place where we > can > >> >>>> inspect > >> >>>>>>> the pipeline, create and send the metadata when the job starts. > >> When > >> >>>> we > >> >>>>>>> create the FlinkApplication entity we need to be able to easily > >> >>>> determine > >> >>>>>>> the sources and sinks (and their properties) of the pipeline. > >> >>>>>>> > >> >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could > >> >>>> execute > >> >>>>>>> this logic and even if there was one there is a mismatch of > >> >>>> abstraction > >> >>>>>>> levels needed to implement the integration. > >> >>>>>>> We could imagine a JobSubmission hook executed in the JobManager > >> >>>> runner > >> >>>>>> as > >> >>>>>>> this: > >> >>>>>>> > >> >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration > >> >>>>>>> configuration); > >> >>>>>>> > >> >>>>>>> This is nice but the JobGraph makes it super difficult to > extract > >> >>>> sources > >> >>>>>>> and UDFs to create the metadata entity. The atlas entity however > >> >>>> could be > >> >>>>>>> easily created from the StreamGraph object (used to represent > the > >> >>>> logical > >> >>>>>>> flow) before the JobGraph is generated. To go around this > >> limitation > >> >>>> we > >> >>>>>>> could add a JobGraphGeneratorHook interface: > >> >>>>>>> > >> >>>>>>> void preProcess(StreamGraph streamGraph); void > >> postProcess(JobGraph > >> >>>>>>> jobGraph); > >> >>>>>>> > >> >>>>>>> We could then generate the atlas entity in the preprocess step > and > >> >>>> add a > >> >>>>>>> jobmission hook in the postprocess step that will simply send > the > >> >>>> already > >> >>>>>>> baked in entity. > >> >>>>>>> > >> >>>>>>> *This kinda works but...* > >> >>>>>>> The approach outlined above seems to work and we have built a > POC > >> >>>> using > >> >>>>>> it. > >> >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs > >> such > >> >>>> as > >> >>>>>> the > >> >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead > of > >> >>>> one. > >> >>>>>>> > >> >>>>>>> It would be much nicer if we could somehow go back from JobGraph > >> to > >> >>>>>>> StreamGraph or at least have an easy way to access source/sink > >> UDFS. > >> >>>>>>> > >> >>>>>>> What do you think? > >> >>>>>>> > >> >>>>>>> Cheers, > >> >>>>>>> Gyula > >> >>>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> -- > >> >>>>>> Best Regards > >> >>>>>> > >> >>>>>> Jeff Zhang > >> >>>>>> > >> >>>>> > >> >>>> > >> >>> > >> > > >> > > >