Thanks for the feedback Aljoscha! I have a POC ready with the Flink changes + the Atlas hook implementation. I will try to push this to a public repo tomorrow and we can discuss further based on that!
Gyula On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <aljos...@apache.org> wrote: > I think exposing the Pipeline should be ok. Using the internal > StreamGraph might be problematic because this might change/break but > that's a problem of the external code. > > Aljoscha > > On 11.02.20 16:26, Gyula Fóra wrote: > > Hi All! > > > > I have made a prototype that simply adds a getPipeline() method to the > > JobClient interface. Then I could easily implement the Atlas hook using > the > > JobListener interface. I simply check if Pipeline is instanceof > StreamGraph > > and do the logic there. > > > > I think this is so far the cleanest approach and I much prefer this > > compared to working on the JobGraph directly which would expose even more > > messy internals. > > > > Unfortunately this change alone is not enough for the integration as we > > need to make sure that all Sources/Sinks that we want to integrate to > atlas > > publicly expose some of their properties: > > > > - Kafka source/sink: > > - Kafka props > > - Topic(s) - this is tricky for sinks > > - FS source /sink: > > - Hadoop props > > - Base path for StreamingFileSink > > - Path for ContinuousMonitoringSource > > > > Most of these are straightforward changes, the only question is what we > > want to register in Atlas from the available connectors. Ideally users > > could also somehow register their own Atlas metadata for custom sources > and > > sinks, we could probably introduce an interface for that in Atlas. > > > > Cheers, > > Gyula > > > > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > >> Maybe we could improve the Pipeline interface in the long run, but as a > >> temporary solution the JobClient could expose a getPipeline() method. > >> > >> That way the implementation of the JobListener could check if its a > >> StreamGraph or a Plan. > >> > >> How bad does that sound? > >> > >> Gyula > >> > >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com> > wrote: > >> > >>> Hi Aljoscha! > >>> > >>> That's a valid concert but we should try to figure something out, many > >>> users need this before they can use Flink. > >>> > >>> I think the closest thing we have right now is the StreamGraph. In > >>> contrast with the JobGraph the StreamGraph is pretty nice from a > metadata > >>> perspective :D > >>> The big downside of exposing the StreamGraph is that we don't have it > in > >>> batch. On the other hand we could expose the JobGraph but then the > >>> integration component would still have to do the heavy lifting for > batch > >>> and stream specific operators and UDFs. > >>> > >>> Instead of exposing either StreamGraph/JobGraph, we could come up with > a > >>> metadata like representation for the users but that would be like > >>> implementing Atlas integration itself without Atlas dependencies :D > >>> > >>> As a comparison point, this is how it works in Storm: > >>> Every operator (spout/bolt), stores a config map (string->string) with > >>> all the metadata such as operator class, and the operator specific > configs. > >>> The Atlas hook works on this map. > >>> This is very fragile and depends on a lot of internals. Kind of like > >>> exposing the JobGraph but much worse. I think we can do better. > >>> > >>> Gyula > >>> > >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <aljos...@apache.org> > >>> wrote: > >>> > >>>> If we need it, we can probably beef up the JobListener to allow > >>>> accessing some information about the whole graph or sources and sinks. > >>>> My only concern right now is that we don't have a stable interface for > >>>> our job graphs/pipelines right now. > >>>> > >>>> Best, > >>>> Aljoscha > >>>> > >>>> On 06.02.20 23:00, Gyula Fóra wrote: > >>>>> Hi Jeff & Till! > >>>>> > >>>>> Thanks for the feedback, this is exactly the discussion I was looking > >>>> for. > >>>>> The JobListener looks very promising if we can expose the JobGraph > >>>> somehow > >>>>> (correct me if I am wrong but it is not accessible at the moment). > >>>>> > >>>>> I did not know about this feature that's why I added my JobSubmission > >>>> hook > >>>>> which was pretty similar but only exposing the JobGraph. In general I > >>>> like > >>>>> the listener better and I would not like to add anything extra if we > >>>> can > >>>>> avoid it. > >>>>> > >>>>> Actually the bigger part of the integration work that will need more > >>>>> changes in Flink will be regarding the accessibility of sources/sinks > >>>> from > >>>>> the JobGraph and their specific properties. For instance at the > moment > >>>> the > >>>>> Kafka sources and sinks do not expose anything publicly such as > topics, > >>>>> kafka configs, etc. Same goes for other data connectors that we need > to > >>>>> integrate in the long run. I guess there will be a separate thread on > >>>> this > >>>>> once we iron out the initial integration points :) > >>>>> > >>>>> I will try to play around with the JobListener interface tomorrow and > >>>> see > >>>>> if I can extend it to meet our needs. > >>>>> > >>>>> Cheers, > >>>>> Gyula > >>>>> > >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> wrote: > >>>>> > >>>>>> Hi Gyula, > >>>>>> > >>>>>> Flink 1.10 introduced JobListener which is invoked after job > >>>> submission and > >>>>>> finished. May we can add api on JobClient to get what info you > >>>> needed for > >>>>>> altas integration. > >>>>>> > >>>>>> > >>>>>> > >>>> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46 > >>>>>> > >>>>>> > >>>>>> Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道: > >>>>>> > >>>>>>> Hi all! > >>>>>>> > >>>>>>> We have started some preliminary work on the Flink - Atlas > >>>> integration at > >>>>>>> Cloudera. It seems that the integration will require some new hook > >>>>>>> interfaces at the jobgraph generation and submission phases, so I > >>>>>> figured I > >>>>>>> will open a discussion thread with my initial ideas to get some > early > >>>>>>> feedback. > >>>>>>> > >>>>>>> *Minimal background* > >>>>>>> Very simply put Apache Atlas is a data governance framework that > >>>> stores > >>>>>>> metadata for our data and processing logic to track ownership, > >>>> lineage > >>>>>> etc. > >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and > many > >>>>>>> others. > >>>>>>> > >>>>>>> Adding Flink integration would mean that we can track the input > >>>> output > >>>>>> data > >>>>>>> of our Flink jobs, their owners and how different Flink jobs are > >>>>>> connected > >>>>>>> to each other through the data they produce (lineage). This seems > to > >>>> be a > >>>>>>> very big deal for a lot of companies :) > >>>>>>> > >>>>>>> *Flink - Atlas integration in a nutshell* > >>>>>>> In order to integrate with Atlas we basically need 2 things. > >>>>>>> - Flink entity definitions > >>>>>>> - Flink Atlas hook > >>>>>>> > >>>>>>> The entity definition is the easy part. It is a json that contains > >>>> the > >>>>>>> objects (entities) that we want to store for any give Flink job. > As a > >>>>>>> starter we could have a single FlinkApplication entity that has a > >>>> set of > >>>>>>> inputs and outputs. These inputs/outputs are other Atlas entities > >>>> that > >>>>>> are > >>>>>>> already defines such as Kafka topic or Hbase table. > >>>>>>> > >>>>>>> The Flink atlas hook will be the logic that creates the entity > >>>> instance > >>>>>> and > >>>>>>> uploads it to Atlas when we start a new Flink job. This is the part > >>>> where > >>>>>>> we implement the core logic. > >>>>>>> > >>>>>>> *Job submission hook* > >>>>>>> In order to implement the Atlas hook we need a place where we can > >>>> inspect > >>>>>>> the pipeline, create and send the metadata when the job starts. > When > >>>> we > >>>>>>> create the FlinkApplication entity we need to be able to easily > >>>> determine > >>>>>>> the sources and sinks (and their properties) of the pipeline. > >>>>>>> > >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could > >>>> execute > >>>>>>> this logic and even if there was one there is a mismatch of > >>>> abstraction > >>>>>>> levels needed to implement the integration. > >>>>>>> We could imagine a JobSubmission hook executed in the JobManager > >>>> runner > >>>>>> as > >>>>>>> this: > >>>>>>> > >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration > >>>>>>> configuration); > >>>>>>> > >>>>>>> This is nice but the JobGraph makes it super difficult to extract > >>>> sources > >>>>>>> and UDFs to create the metadata entity. The atlas entity however > >>>> could be > >>>>>>> easily created from the StreamGraph object (used to represent the > >>>> logical > >>>>>>> flow) before the JobGraph is generated. To go around this > limitation > >>>> we > >>>>>>> could add a JobGraphGeneratorHook interface: > >>>>>>> > >>>>>>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph > >>>>>>> jobGraph); > >>>>>>> > >>>>>>> We could then generate the atlas entity in the preprocess step and > >>>> add a > >>>>>>> jobmission hook in the postprocess step that will simply send the > >>>> already > >>>>>>> baked in entity. > >>>>>>> > >>>>>>> *This kinda works but...* > >>>>>>> The approach outlined above seems to work and we have built a POC > >>>> using > >>>>>> it. > >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs > such > >>>> as > >>>>>> the > >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of > >>>> one. > >>>>>>> > >>>>>>> It would be much nicer if we could somehow go back from JobGraph to > >>>>>>> StreamGraph or at least have an easy way to access source/sink > UDFS. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Gyula > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Best Regards > >>>>>> > >>>>>> Jeff Zhang > >>>>>> > >>>>> > >>>> > >>> > > >