Maybe we could improve the Pipeline interface in the long run, but as a
temporary solution the JobClient could expose a getPipeline() method.

That way the implementation of the JobListener could check if its a
StreamGraph or a Plan.

How bad does that sound?

Gyula

On Fri, Feb 7, 2020 at 10:19 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi Aljoscha!
>
> That's a valid concert but we should try to figure something out, many
> users need this before they can use Flink.
>
> I think the closest thing we have right now is the StreamGraph. In
> contrast with the JobGraph  the StreamGraph is pretty nice from a metadata
> perspective :D
> The big downside of exposing the StreamGraph is that we don't have it in
> batch. On the other hand we could expose the JobGraph but then the
> integration component would still have to do the heavy lifting for batch
> and stream specific operators and UDFs.
>
> Instead of exposing either StreamGraph/JobGraph, we could come up with a
> metadata like representation for the users but that would be like
> implementing Atlas integration itself without Atlas dependencies :D
>
> As a comparison point, this is how it works in Storm:
> Every operator (spout/bolt), stores a config map (string->string) with all
> the metadata such as operator class, and the operator specific configs. The
> Atlas hook works on this map.
> This is very fragile and depends on a lot of internals. Kind of like
> exposing the JobGraph but much worse. I think we can do better.
>
> Gyula
>
> On Fri, Feb 7, 2020 at 9:55 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> If we need it, we can probably beef up the JobListener to allow
>> accessing some information about the whole graph or sources and sinks.
>> My only concern right now is that we don't have a stable interface for
>> our job graphs/pipelines right now.
>>
>> Best,
>> Aljoscha
>>
>> On 06.02.20 23:00, Gyula Fóra wrote:
>> > Hi Jeff & Till!
>> >
>> > Thanks for the feedback, this is exactly the discussion I was looking
>> for.
>> > The JobListener looks very promising if we can expose the JobGraph
>> somehow
>> > (correct me if I am wrong but it is not accessible at the moment).
>> >
>> > I did not know about this feature that's why I added my JobSubmission
>> hook
>> > which was pretty similar but only exposing the JobGraph. In general I
>> like
>> > the listener better and I would not like to add anything extra if we can
>> > avoid it.
>> >
>> > Actually the bigger part of the integration work that will need more
>> > changes in Flink will be regarding the accessibility of sources/sinks
>> from
>> > the JobGraph and their specific properties. For instance at the moment
>> the
>> > Kafka sources and sinks do not expose anything publicly such as topics,
>> > kafka configs, etc. Same goes for other data connectors that we need to
>> > integrate in the long run. I guess there will be a separate thread on
>> this
>> > once we iron out the initial integration points :)
>> >
>> > I will try to play around with the JobListener interface tomorrow and
>> see
>> > if I can extend it to meet our needs.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Thu, Feb 6, 2020 at 4:08 PM Jeff Zhang <zjf...@gmail.com> wrote:
>> >
>> >> Hi Gyula,
>> >>
>> >> Flink 1.10 introduced JobListener which is invoked after job
>> submission and
>> >> finished.  May we can add api on JobClient to get what info you needed
>> for
>> >> altas integration.
>> >>
>> >>
>> >>
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L46
>> >>
>> >>
>> >> Gyula Fóra <gyf...@apache.org> 于2020年2月5日周三 下午7:48写道:
>> >>
>> >>> Hi all!
>> >>>
>> >>> We have started some preliminary work on the Flink - Atlas
>> integration at
>> >>> Cloudera. It seems that the integration will require some new hook
>> >>> interfaces at the jobgraph generation and submission phases, so I
>> >> figured I
>> >>> will open a discussion thread with my initial ideas to get some early
>> >>> feedback.
>> >>>
>> >>> *Minimal background*
>> >>> Very simply put Apache Atlas is a data governance framework that
>> stores
>> >>> metadata for our data and processing logic to track ownership, lineage
>> >> etc.
>> >>> It is already integrated with systems like HDFS, Kafka, Hive and many
>> >>> others.
>> >>>
>> >>> Adding Flink integration would mean that we can track the input output
>> >> data
>> >>> of our Flink jobs, their owners and how different Flink jobs are
>> >> connected
>> >>> to each other through the data they produce (lineage). This seems to
>> be a
>> >>> very big deal for a lot of companies :)
>> >>>
>> >>> *Flink - Atlas integration in a nutshell*
>> >>> In order to integrate with Atlas we basically need 2 things.
>> >>>   - Flink entity definitions
>> >>>   - Flink Atlas hook
>> >>>
>> >>> The entity definition is the easy part. It is a json that contains the
>> >>> objects (entities) that we want to store for any give Flink job. As a
>> >>> starter we could have a single FlinkApplication entity that has a set
>> of
>> >>> inputs and outputs. These inputs/outputs are other Atlas entities that
>> >> are
>> >>> already defines such as Kafka topic or Hbase table.
>> >>>
>> >>> The Flink atlas hook will be the logic that creates the entity
>> instance
>> >> and
>> >>> uploads it to Atlas when we start a new Flink job. This is the part
>> where
>> >>> we implement the core logic.
>> >>>
>> >>> *Job submission hook*
>> >>> In order to implement the Atlas hook we need a place where we can
>> inspect
>> >>> the pipeline, create and send the metadata when the job starts. When
>> we
>> >>> create the FlinkApplication entity we need to be able to easily
>> determine
>> >>> the sources and sinks (and their properties) of the pipeline.
>> >>>
>> >>> Unfortunately there is no JobSubmission hook in Flink that could
>> execute
>> >>> this logic and even if there was one there is a mismatch of
>> abstraction
>> >>> levels needed to implement the integration.
>> >>> We could imagine a JobSubmission hook executed in the JobManager
>> runner
>> >> as
>> >>> this:
>> >>>
>> >>> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
>> >>> configuration);
>> >>>
>> >>> This is nice but the JobGraph makes it super difficult to extract
>> sources
>> >>> and UDFs to create the metadata entity. The atlas entity however
>> could be
>> >>> easily created from the StreamGraph object (used to represent the
>> logical
>> >>> flow) before the JobGraph is generated. To go around this limitation
>> we
>> >>> could add a JobGraphGeneratorHook interface:
>> >>>
>> >>> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
>> >>> jobGraph);
>> >>>
>> >>> We could then generate the atlas entity in the preprocess step and
>> add a
>> >>> jobmission hook in the postprocess step that will simply send the
>> already
>> >>> baked in entity.
>> >>>
>> >>> *This kinda works but...*
>> >>> The approach outlined above seems to work and we have built a POC
>> using
>> >> it.
>> >>> Unfortunately it is far from nice as it exposes non-public APIs such
>> as
>> >> the
>> >>> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
>> >>>
>> >>> It would be much nicer if we could somehow go back from JobGraph to
>> >>> StreamGraph or at least have an easy way to access source/sink UDFS.
>> >>>
>> >>> What do you think?
>> >>>
>> >>> Cheers,
>> >>> Gyula
>> >>>
>> >>
>> >>
>> >> --
>> >> Best Regards
>> >>
>> >> Jeff Zhang
>> >>
>> >
>>
>

Reply via email to