Thanks for the feedback Aljoscha!

I have a POC ready with the Flink changes + the Atlas hook implementation.
I will try to push this to a public repo tomorrow and we can discuss
further based on that!

Gyula

On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <aljos...@apache.org> wrote:

> I think exposing the Pipeline should be ok. Using the internal
> StreamGraph might be problematic because this might change/break but
> that's a problem of the external code.
>
> Aljoscha
>
> On 11.02.20 16:26, Gyula Fóra wrote:
> > Hi All!
> >
> > I have made a prototype that simply adds a getPipeline() method to the
> > JobClient interface. Then I could easily implement the Atlas hook using
> the
> > JobListener interface. I simply check if Pipeline is instanceof
> StreamGraph
> > and do the logic there.
> >
> > I think this is so far the cleanest approach and I much prefer this
> > compared to working on the JobGraph directly which would expose even more
> > messy internals.
> >
> > Unfortunately this change alone is not enough for the integration as we
> > need to make sure that all Sources/Sinks that we want to integrate to
> atlas
> > publicly expose some of their properties:
> >
> >     - Kafka source/sink:
> >        - Kafka props
> >        - Topic(s) - this is tricky for sinks
> >     - FS source /sink:
> >        - Hadoop props
> >        - Base path for StreamingFileSink
> >        - Path for ContinuousMonitoringSource
> >
> > Most of these are straightforward changes, the only question is what we
> > want to register in Atlas from the available connectors. Ideally users
> > could also somehow register their own Atlas metadata for custom sources
> and
> > sinks, we could probably introduce an interface for that in Atlas.
> >
> > Cheers,
> > Gyula
> >
> > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> >> Maybe we could improve the Pipeline interface in the long run, but as a
> >> temporary solution the JobClient could expose a getPipeline() method.
> >>
> >> That way the implementation of the JobListener could check if its a
> >> StreamGraph or a Plan.
> >>
> >> How bad does that sound?
> >>
> >> Gyula
> >>
> >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> >>
> >>> Hi Aljoscha!
> >>>
> >>> That's a valid concert but we should try to figure something out, many
> >>> users need this before they can use Flink.
> >>>
> >>> I think the closest thing we have right now is the StreamGraph. In
> >>> contrast with the JobGraph  the StreamGraph is pretty nice from a
> metadata
> >>> perspective :D
> >>> The big downside of exposing the StreamGraph is that we don't have it
> in
> >>> batch. On the other hand we could expose the JobGraph but then the
> >>> integration component would still have to do the heavy lifting for
> batch
> >>> and stream specific operators and UDFs.
> >>>
> >>> Instead of exposing either StreamGraph/JobGraph, we could come up with
> a
> >>> metadata like representation for the users but that would be like
> >>> implementing Atlas integration itself without Atlas dependencies :D
> >>>
> >>> As a comparison point, this is how it works in Storm:
> >>> Every operator (spout/bolt), stores a config map (string->string) with
> >>> all the metadata such as operator class, and the operator specific
> configs.
> >>> The Atlas hook works on this map.
> >>> This is very fragile and depends on a lot of internals. Kind of like
> >>> exposing the JobGraph but much worse. I think we can do better.
> >>>
> >>> Gyula
> >>>
> >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <aljos...@apache.org>
> >>> wrote:
> >>>
> >>>> If we need it, we can probably beef up the JobListener to allow
> >>>> accessing some information about the whole graph or sources and sinks.
> >>>> My only concern right now is that we don't have a stable interface for
> >>>> our job graphs/pipelines right now.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> On 06.02.20 23:00, Gyula Fóra wrote:
> >>>>> Hi Jeff & Till!
> >>>>>
> >>>>> Thanks for the feedback, this is exactly the discussion I was looking
> >>>> for.
> >>>>> The JobListener looks very promising if we can expose the JobGraph
> >>>> somehow
> >>>>> (correct me if I am wrong but it is not accessible at the moment).
> >>>>>
> >>>>> I did not know about this feature that's why I added my JobSubmission
> >>>> hook
> >>>>> which was pretty similar but only exposing the JobGraph. In general I
> >>>> like
> >>>>> the listener better and I would not like to add anything extra if we
> >>>> can
> >>>>> avoid it.
> >>>>>
> >>>>> Actually the bigger part of the integration work that will need more
> >>>>> changes in Flink will be regarding the accessibility of sources/sinks
> >>>> from
> >>>>> the JobGraph and their specific properties. For instance at the
> moment
> >>>> the
> >>>>> Kafka sources and sinks do not expose anything publicly such as
> topics,
> >>>>> kafka configs, etc. Same goes for other data connectors that we need
> to
> >>>>> integrate in the long run. I guess there will be a separate thread on
> >>>> this
> >>>>> once we iron out the initial integration points :)
> >>>>>
> >>>>> I will try to play around with the JobListener interface tomorrow and
> >>>> see
> >>>>> if I can extend it to meet our needs.
> >>>>>
> >>>>> Cheers,
> >>>>> Gyula
> >>>>>
> >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi Gyula,
> >>>>>>
> >>>>>> Flink 1.10 introduced JobListener which is invoked after job
> >>>> submission and
> >>>>>> finished.  May we can add api on JobClient to get what info you
> >>>> needed for
> >>>>>> altas integration.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
> >>>>>>
> >>>>>>
> >>>>>> Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道:
> >>>>>>
> >>>>>>> Hi all!
> >>>>>>>
> >>>>>>> We have started some preliminary work on the Flink - Atlas
> >>>> integration at
> >>>>>>> Cloudera. It seems that the integration will require some new hook
> >>>>>>> interfaces at the jobgraph generation and submission phases, so I
> >>>>>> figured I
> >>>>>>> will open a discussion thread with my initial ideas to get some
> early
> >>>>>>> feedback.
> >>>>>>>
> >>>>>>> *Minimal background*
> >>>>>>> Very simply put Apache Atlas is a data governance framework that
> >>>> stores
> >>>>>>> metadata for our data and processing logic to track ownership,
> >>>> lineage
> >>>>>> etc.
> >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and
> many
> >>>>>>> others.
> >>>>>>>
> >>>>>>> Adding Flink integration would mean that we can track the input
> >>>> output
> >>>>>> data
> >>>>>>> of our Flink jobs, their owners and how different Flink jobs are
> >>>>>> connected
> >>>>>>> to each other through the data they produce (lineage). This seems
> to
> >>>> be a
> >>>>>>> very big deal for a lot of companies :)
> >>>>>>>
> >>>>>>> *Flink - Atlas integration in a nutshell*
> >>>>>>> In order to integrate with Atlas we basically need 2 things.
> >>>>>>>    - Flink entity definitions
> >>>>>>>    - Flink Atlas hook
> >>>>>>>
> >>>>>>> The entity definition is the easy part. It is a json that contains
> >>>> the
> >>>>>>> objects (entities) that we want to store for any give Flink job.
> As a
> >>>>>>> starter we could have a single FlinkApplication entity that has a
> >>>> set of
> >>>>>>> inputs and outputs. These inputs/outputs are other Atlas entities
> >>>> that
> >>>>>> are
> >>>>>>> already defines such as Kafka topic or Hbase table.
> >>>>>>>
> >>>>>>> The Flink atlas hook will be the logic that creates the entity
> >>>> instance
> >>>>>> and
> >>>>>>> uploads it to Atlas when we start a new Flink job. This is the part
> >>>> where
> >>>>>>> we implement the core logic.
> >>>>>>>
> >>>>>>> *Job submission hook*
> >>>>>>> In order to implement the Atlas hook we need a place where we can
> >>>> inspect
> >>>>>>> the pipeline, create and send the metadata when the job starts.
> When
> >>>> we
> >>>>>>> create the FlinkApplication entity we need to be able to easily
> >>>> determine
> >>>>>>> the sources and sinks (and their properties) of the pipeline.
> >>>>>>>
> >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could
> >>>> execute
> >>>>>>> this logic and even if there was one there is a mismatch of
> >>>> abstraction
> >>>>>>> levels needed to implement the integration.
> >>>>>>> We could imagine a JobSubmission hook executed in the JobManager
> >>>> runner
> >>>>>> as
> >>>>>>> this:
> >>>>>>>
> >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> >>>>>>> configuration);
> >>>>>>>
> >>>>>>> This is nice but the JobGraph makes it super difficult to extract
> >>>> sources
> >>>>>>> and UDFs to create the metadata entity. The atlas entity however
> >>>> could be
> >>>>>>> easily created from the StreamGraph object (used to represent the
> >>>> logical
> >>>>>>> flow) before the JobGraph is generated. To go around this
> limitation
> >>>> we
> >>>>>>> could add a JobGraphGeneratorHook interface:
> >>>>>>>
> >>>>>>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> >>>>>>> jobGraph);
> >>>>>>>
> >>>>>>> We could then generate the atlas entity in the preprocess step and
> >>>> add a
> >>>>>>> jobmission hook in the postprocess step that will simply send the
> >>>> already
> >>>>>>> baked in entity.
> >>>>>>>
> >>>>>>> *This kinda works but...*
> >>>>>>> The approach outlined above seems to work and we have built a POC
> >>>> using
> >>>>>> it.
> >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs
> such
> >>>> as
> >>>>>> the
> >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of
> >>>> one.
> >>>>>>>
> >>>>>>> It would be much nicer if we could somehow go back from JobGraph to
> >>>>>>> StreamGraph or at least have an easy way to access source/sink
> UDFS.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Gyula
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Best Regards
> >>>>>>
> >>>>>> Jeff Zhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>

Reply via email to