Maybe we could improve the Pipeline interface in the long run, but as a temporary solution the JobClient could expose a getPipeline() method.
That way the implementation of the JobListener could check if its a StreamGraph or a Plan. How bad does that sound? Gyula On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Aljoscha! > > That's a valid concert but we should try to figure something out, many > users need this before they can use Flink. > > I think the closest thing we have right now is the StreamGraph. In > contrast with the JobGraph the StreamGraph is pretty nice from a metadata > perspective :D > The big downside of exposing the StreamGraph is that we don't have it in > batch. On the other hand we could expose the JobGraph but then the > integration component would still have to do the heavy lifting for batch > and stream specific operators and UDFs. > > Instead of exposing either StreamGraph/JobGraph, we could come up with a > metadata like representation for the users but that would be like > implementing Atlas integration itself without Atlas dependencies :D > > As a comparison point, this is how it works in Storm: > Every operator (spout/bolt), stores a config map (string->string) with all > the metadata such as operator class, and the operator specific configs. The > Atlas hook works on this map. > This is very fragile and depends on a lot of internals. Kind of like > exposing the JobGraph but much worse. I think we can do better. > > Gyula > > On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> If we need it, we can probably beef up the JobListener to allow >> accessing some information about the whole graph or sources and sinks. >> My only concern right now is that we don't have a stable interface for >> our job graphs/pipelines right now. >> >> Best, >> Aljoscha >> >> On 06.02.20 23:00, Gyula Fóra wrote: >> > Hi Jeff & Till! >> > >> > Thanks for the feedback, this is exactly the discussion I was looking >> for. >> > The JobListener looks very promising if we can expose the JobGraph >> somehow >> > (correct me if I am wrong but it is not accessible at the moment). >> > >> > I did not know about this feature that's why I added my JobSubmission >> hook >> > which was pretty similar but only exposing the JobGraph. In general I >> like >> > the listener better and I would not like to add anything extra if we can >> > avoid it. >> > >> > Actually the bigger part of the integration work that will need more >> > changes in Flink will be regarding the accessibility of sources/sinks >> from >> > the JobGraph and their specific properties. For instance at the moment >> the >> > Kafka sources and sinks do not expose anything publicly such as topics, >> > kafka configs, etc. Same goes for other data connectors that we need to >> > integrate in the long run. I guess there will be a separate thread on >> this >> > once we iron out the initial integration points :) >> > >> > I will try to play around with the JobListener interface tomorrow and >> see >> > if I can extend it to meet our needs. >> > >> > Cheers, >> > Gyula >> > >> > On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> wrote: >> > >> >> Hi Gyula, >> >> >> >> Flink 1.10 introduced JobListener which is invoked after job >> submission and >> >> finished. May we can add api on JobClient to get what info you needed >> for >> >> altas integration. >> >> >> >> >> >> >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46 >> >> >> >> >> >> Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道: >> >> >> >>> Hi all! >> >>> >> >>> We have started some preliminary work on the Flink - Atlas >> integration at >> >>> Cloudera. It seems that the integration will require some new hook >> >>> interfaces at the jobgraph generation and submission phases, so I >> >> figured I >> >>> will open a discussion thread with my initial ideas to get some early >> >>> feedback. >> >>> >> >>> *Minimal background* >> >>> Very simply put Apache Atlas is a data governance framework that >> stores >> >>> metadata for our data and processing logic to track ownership, lineage >> >> etc. >> >>> It is already integrated with systems like HDFS, Kafka, Hive and many >> >>> others. >> >>> >> >>> Adding Flink integration would mean that we can track the input output >> >> data >> >>> of our Flink jobs, their owners and how different Flink jobs are >> >> connected >> >>> to each other through the data they produce (lineage). This seems to >> be a >> >>> very big deal for a lot of companies :) >> >>> >> >>> *Flink - Atlas integration in a nutshell* >> >>> In order to integrate with Atlas we basically need 2 things. >> >>> - Flink entity definitions >> >>> - Flink Atlas hook >> >>> >> >>> The entity definition is the easy part. It is a json that contains the >> >>> objects (entities) that we want to store for any give Flink job. As a >> >>> starter we could have a single FlinkApplication entity that has a set >> of >> >>> inputs and outputs. These inputs/outputs are other Atlas entities that >> >> are >> >>> already defines such as Kafka topic or Hbase table. >> >>> >> >>> The Flink atlas hook will be the logic that creates the entity >> instance >> >> and >> >>> uploads it to Atlas when we start a new Flink job. This is the part >> where >> >>> we implement the core logic. >> >>> >> >>> *Job submission hook* >> >>> In order to implement the Atlas hook we need a place where we can >> inspect >> >>> the pipeline, create and send the metadata when the job starts. When >> we >> >>> create the FlinkApplication entity we need to be able to easily >> determine >> >>> the sources and sinks (and their properties) of the pipeline. >> >>> >> >>> Unfortunately there is no JobSubmission hook in Flink that could >> execute >> >>> this logic and even if there was one there is a mismatch of >> abstraction >> >>> levels needed to implement the integration. >> >>> We could imagine a JobSubmission hook executed in the JobManager >> runner >> >> as >> >>> this: >> >>> >> >>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration >> >>> configuration); >> >>> >> >>> This is nice but the JobGraph makes it super difficult to extract >> sources >> >>> and UDFs to create the metadata entity. The atlas entity however >> could be >> >>> easily created from the StreamGraph object (used to represent the >> logical >> >>> flow) before the JobGraph is generated. To go around this limitation >> we >> >>> could add a JobGraphGeneratorHook interface: >> >>> >> >>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph >> >>> jobGraph); >> >>> >> >>> We could then generate the atlas entity in the preprocess step and >> add a >> >>> jobmission hook in the postprocess step that will simply send the >> already >> >>> baked in entity. >> >>> >> >>> *This kinda works but...* >> >>> The approach outlined above seems to work and we have built a POC >> using >> >> it. >> >>> Unfortunately it is far from nice as it exposes non-public APIs such >> as >> >> the >> >>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one. >> >>> >> >>> It would be much nicer if we could somehow go back from JobGraph to >> >>> StreamGraph or at least have an easy way to access source/sink UDFS. >> >>> >> >>> What do you think? >> >>> >> >>> Cheers, >> >>> Gyula >> >>> >> >> >> >> >> >> -- >> >> Best Regards >> >> >> >> Jeff Zhang >> >> >> > >> >