Hi all!

Thank you for the patience!

We have created a small design document for the change proposal detailing
the minimal required changes in Flink for the initial version of the Atlas
integration.

You can find the document here:
https://docs.google.com/document/d/1wSgzPdhcwt-SlNBBqL-Zb7g8fY6bN8JwHEg7GCdsBG8/edit?usp=sharing

It would be great if you could check it out and comment on it.
If we agree on the next steps I will start opening JIRA-s and PRs with the
proposed changes.

The document links to an already working Atlas hook prototype (and
accompanying flink fork). The links for that are also here:
Flink: https://github.com/gyfora/flink/tree/atlas-changes
Atlas: https://github.com/gyfora/atlas/tree/flink-bridge

Thank you!
Gyula

On Thu, Feb 13, 2020 at 4:43 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Thanks for the feedback Aljoscha!
>
> I have a POC ready with the Flink changes + the Atlas hook implementation.
> I will try to push this to a public repo tomorrow and we can discuss
> further based on that!
>
> Gyula
>
> On Thu, Feb 13, 2020, 15:26 Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> I think exposing the Pipeline should be ok. Using the internal
>> StreamGraph might be problematic because this might change/break but
>> that's a problem of the external code.
>>
>> Aljoscha
>>
>> On 11.02.20 16:26, Gyula Fóra wrote:
>> > Hi All!
>> >
>> > I have made a prototype that simply adds a getPipeline() method to the
>> > JobClient interface. Then I could easily implement the Atlas hook using
>> the
>> > JobListener interface. I simply check if Pipeline is instanceof
>> StreamGraph
>> > and do the logic there.
>> >
>> > I think this is so far the cleanest approach and I much prefer this
>> > compared to working on the JobGraph directly which would expose even
>> more
>> > messy internals.
>> >
>> > Unfortunately this change alone is not enough for the integration as we
>> > need to make sure that all Sources/Sinks that we want to integrate to
>> atlas
>> > publicly expose some of their properties:
>> >
>> >     - Kafka source/sink:
>> >        - Kafka props
>> >        - Topic(s) - this is tricky for sinks
>> >     - FS source /sink:
>> >        - Hadoop props
>> >        - Base path for StreamingFileSink
>> >        - Path for ContinuousMonitoringSource
>> >
>> > Most of these are straightforward changes, the only question is what we
>> > want to register in Atlas from the available connectors. Ideally users
>> > could also somehow register their own Atlas metadata for custom sources
>> and
>> > sinks, we could probably introduce an interface for that in Atlas.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Fri, Feb 7, 2020 at 10:37 AM Gyula Fóra <gyula.f...@gmail.com>
>> wrote:
>> >
>> >> Maybe we could improve the Pipeline interface in the long run, but as a
>> >> temporary solution the JobClient could expose a getPipeline() method.
>> >>
>> >> That way the implementation of the JobListener could check if its a
>> >> StreamGraph or a Plan.
>> >>
>> >> How bad does that sound?
>> >>
>> >> Gyula
>> >>
>> >> On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com>
>> wrote:
>> >>
>> >>> Hi Aljoscha!
>> >>>
>> >>> That's a valid concert but we should try to figure something out, many
>> >>> users need this before they can use Flink.
>> >>>
>> >>> I think the closest thing we have right now is the StreamGraph. In
>> >>> contrast with the JobGraph  the StreamGraph is pretty nice from a
>> metadata
>> >>> perspective :D
>> >>> The big downside of exposing the StreamGraph is that we don't have it
>> in
>> >>> batch. On the other hand we could expose the JobGraph but then the
>> >>> integration component would still have to do the heavy lifting for
>> batch
>> >>> and stream specific operators and UDFs.
>> >>>
>> >>> Instead of exposing either StreamGraph/JobGraph, we could come up
>> with a
>> >>> metadata like representation for the users but that would be like
>> >>> implementing Atlas integration itself without Atlas dependencies :D
>> >>>
>> >>> As a comparison point, this is how it works in Storm:
>> >>> Every operator (spout/bolt), stores a config map (string->string) with
>> >>> all the metadata such as operator class, and the operator specific
>> configs.
>> >>> The Atlas hook works on this map.
>> >>> This is very fragile and depends on a lot of internals. Kind of like
>> >>> exposing the JobGraph but much worse. I think we can do better.
>> >>>
>> >>> Gyula
>> >>>
>> >>> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <aljos...@apache.org>
>> >>> wrote:
>> >>>
>> >>>> If we need it, we can probably beef up the JobListener to allow
>> >>>> accessing some information about the whole graph or sources and
>> sinks.
>> >>>> My only concern right now is that we don't have a stable interface
>> for
>> >>>> our job graphs/pipelines right now.
>> >>>>
>> >>>> Best,
>> >>>> Aljoscha
>> >>>>
>> >>>> On 06.02.20 23:00, Gyula Fóra wrote:
>> >>>>> Hi Jeff & Till!
>> >>>>>
>> >>>>> Thanks for the feedback, this is exactly the discussion I was
>> looking
>> >>>> for.
>> >>>>> The JobListener looks very promising if we can expose the JobGraph
>> >>>> somehow
>> >>>>> (correct me if I am wrong but it is not accessible at the moment).
>> >>>>>
>> >>>>> I did not know about this feature that's why I added my
>> JobSubmission
>> >>>> hook
>> >>>>> which was pretty similar but only exposing the JobGraph. In general
>> I
>> >>>> like
>> >>>>> the listener better and I would not like to add anything extra if we
>> >>>> can
>> >>>>> avoid it.
>> >>>>>
>> >>>>> Actually the bigger part of the integration work that will need more
>> >>>>> changes in Flink will be regarding the accessibility of
>> sources/sinks
>> >>>> from
>> >>>>> the JobGraph and their specific properties. For instance at the
>> moment
>> >>>> the
>> >>>>> Kafka sources and sinks do not expose anything publicly such as
>> topics,
>> >>>>> kafka configs, etc. Same goes for other data connectors that we
>> need to
>> >>>>> integrate in the long run. I guess there will be a separate thread
>> on
>> >>>> this
>> >>>>> once we iron out the initial integration points :)
>> >>>>>
>> >>>>> I will try to play around with the JobListener interface tomorrow
>> and
>> >>>> see
>> >>>>> if I can extend it to meet our needs.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Gyula
>> >>>>>
>> >>>>> On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hi Gyula,
>> >>>>>>
>> >>>>>> Flink 1.10 introduced JobListener which is invoked after job
>> >>>> submission and
>> >>>>>> finished.  May we can add api on JobClient to get what info you
>> >>>> needed for
>> >>>>>> altas integration.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>> >>>>>>
>> >>>>>>
>> >>>>>> Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道:
>> >>>>>>
>> >>>>>>> Hi all!
>> >>>>>>>
>> >>>>>>> We have started some preliminary work on the Flink - Atlas
>> >>>> integration at
>> >>>>>>> Cloudera. It seems that the integration will require some new hook
>> >>>>>>> interfaces at the jobgraph generation and submission phases, so I
>> >>>>>> figured I
>> >>>>>>> will open a discussion thread with my initial ideas to get some
>> early
>> >>>>>>> feedback.
>> >>>>>>>
>> >>>>>>> *Minimal background*
>> >>>>>>> Very simply put Apache Atlas is a data governance framework that
>> >>>> stores
>> >>>>>>> metadata for our data and processing logic to track ownership,
>> >>>> lineage
>> >>>>>> etc.
>> >>>>>>> It is already integrated with systems like HDFS, Kafka, Hive and
>> many
>> >>>>>>> others.
>> >>>>>>>
>> >>>>>>> Adding Flink integration would mean that we can track the input
>> >>>> output
>> >>>>>> data
>> >>>>>>> of our Flink jobs, their owners and how different Flink jobs are
>> >>>>>> connected
>> >>>>>>> to each other through the data they produce (lineage). This seems
>> to
>> >>>> be a
>> >>>>>>> very big deal for a lot of companies :)
>> >>>>>>>
>> >>>>>>> *Flink - Atlas integration in a nutshell*
>> >>>>>>> In order to integrate with Atlas we basically need 2 things.
>> >>>>>>>    - Flink entity definitions
>> >>>>>>>    - Flink Atlas hook
>> >>>>>>>
>> >>>>>>> The entity definition is the easy part. It is a json that contains
>> >>>> the
>> >>>>>>> objects (entities) that we want to store for any give Flink job.
>> As a
>> >>>>>>> starter we could have a single FlinkApplication entity that has a
>> >>>> set of
>> >>>>>>> inputs and outputs. These inputs/outputs are other Atlas entities
>> >>>> that
>> >>>>>> are
>> >>>>>>> already defines such as Kafka topic or Hbase table.
>> >>>>>>>
>> >>>>>>> The Flink atlas hook will be the logic that creates the entity
>> >>>> instance
>> >>>>>> and
>> >>>>>>> uploads it to Atlas when we start a new Flink job. This is the
>> part
>> >>>> where
>> >>>>>>> we implement the core logic.
>> >>>>>>>
>> >>>>>>> *Job submission hook*
>> >>>>>>> In order to implement the Atlas hook we need a place where we can
>> >>>> inspect
>> >>>>>>> the pipeline, create and send the metadata when the job starts.
>> When
>> >>>> we
>> >>>>>>> create the FlinkApplication entity we need to be able to easily
>> >>>> determine
>> >>>>>>> the sources and sinks (and their properties) of the pipeline.
>> >>>>>>>
>> >>>>>>> Unfortunately there is no JobSubmission hook in Flink that could
>> >>>> execute
>> >>>>>>> this logic and even if there was one there is a mismatch of
>> >>>> abstraction
>> >>>>>>> levels needed to implement the integration.
>> >>>>>>> We could imagine a JobSubmission hook executed in the JobManager
>> >>>> runner
>> >>>>>> as
>> >>>>>>> this:
>> >>>>>>>
>> >>>>>>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
>> >>>>>>> configuration);
>> >>>>>>>
>> >>>>>>> This is nice but the JobGraph makes it super difficult to extract
>> >>>> sources
>> >>>>>>> and UDFs to create the metadata entity. The atlas entity however
>> >>>> could be
>> >>>>>>> easily created from the StreamGraph object (used to represent the
>> >>>> logical
>> >>>>>>> flow) before the JobGraph is generated. To go around this
>> limitation
>> >>>> we
>> >>>>>>> could add a JobGraphGeneratorHook interface:
>> >>>>>>>
>> >>>>>>> void preProcess(StreamGraph streamGraph); void
>> postProcess(JobGraph
>> >>>>>>> jobGraph);
>> >>>>>>>
>> >>>>>>> We could then generate the atlas entity in the preprocess step and
>> >>>> add a
>> >>>>>>> jobmission hook in the postprocess step that will simply send the
>> >>>> already
>> >>>>>>> baked in entity.
>> >>>>>>>
>> >>>>>>> *This kinda works but...*
>> >>>>>>> The approach outlined above seems to work and we have built a POC
>> >>>> using
>> >>>>>> it.
>> >>>>>>> Unfortunately it is far from nice as it exposes non-public APIs
>> such
>> >>>> as
>> >>>>>> the
>> >>>>>>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of
>> >>>> one.
>> >>>>>>>
>> >>>>>>> It would be much nicer if we could somehow go back from JobGraph
>> to
>> >>>>>>> StreamGraph or at least have an easy way to access source/sink
>> UDFS.
>> >>>>>>>
>> >>>>>>> What do you think?
>> >>>>>>>
>> >>>>>>> Cheers,
>> >>>>>>> Gyula
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> --
>> >>>>>> Best Regards
>> >>>>>>
>> >>>>>> Jeff Zhang
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >
>>
>

Reply via email to