Thanks Shammon for driving this FLIP, I have some comments about the updated FLIP.
1. It confuses for me that LineageEdge interface contains multiple sources but one sink, the relation looks like a graph instead of an edge in topology. 2. TableColumnSourceLineageVertex interface is not clear for me as why it’s designed to return multiple columns, is there the case ColumnAOfSrcT1 -> columnAofSinkT1, ColumnBOfSrcT1-> columnAofSinkT1 ? Minor: the table() method name could be improved as it returns a TableLineageVertex instead of Table. 3. SupportsLineageVertex for Datastream source and sink are not natural to me, the underling logic is source/sink can provide a linage information optionally, how about change it to LineageVertexProvider { LineageVertex getLineageVertex();} ? 4.I’m hesitate to introducing method setLineageVertex for DataStreamSource and DataStreamSink as: (a) Lineage is different to parallelism of source/sink, it should be unique and certain for a given source/sink, I prefer to only expose one interface to users. (b) The way user to build SourceLineageVertex VS the way user to implement a LineageVertexProvider, which cost is higher? minor: "SourceLineageEdge LineageEdge;” looks like a typo ? 5.DataStreamSink#setLineageEdge is not clear for me now, a sink can hold all LineageEdges even these edges’ target is not the sink itself, right? It’s incorrect here. I have an intuition that the lineages of all pipelines in a datastream application should belong to the StreamExecutionEnviroment , introducing setLineageEdge method for StreamExecutionEnviroment should be better than current proposal. Best, Leonard > On Jul 5, 2023, at 5:26 PM, Shammon FY <zjur...@gmail.com> wrote: > > Hi Jing, > > Thanks for your feedback. > >> 1. TableColumnLineageRelation#sinkColumn() should return > TableColumnLineageEntity instead of String, right? > > The `sinkColumn()` will return `String` which is the column name in the > sink connector. I found the name of `TableColumnLineageEntity` may > cause ambiguity and I have renamed it to `TableColumnSourceLineageEntity`. > In my mind the `TableColumnLineageRelation` represents the lineage for each > sink column, each column may be computed from multiple sources and columns. > I use `TableColumnSourceLineageEntity` to manage each source and its > columns for the sink column, so `TableColumnLineageRelation` has a sink > column name and `TableColumnSourceLineageEntity` list. > >> 2. Since LineageRelation already contains all information to build the > lineage between sources and sink, do we still need to set the LineageEntity > in the source? > > The lineage interface of `DataStream` is very flexible. We have added > `setLineageEntity` to the source to limit and verify user behavior, > ensuring that users have not added non-existent sources as lineage. > >> 3. About the "Entity" and "Relation" naming, I was confused too, like > Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges > which contains multiple LineageEdge? > > We referred to `Atlas` for the name of lineage, it uses `Entity` and > `Relation` to represent the lineage relationship and another metadata > service `Datahub` uses `DataSet` to represent the entity. I think `Entity` > and `Relation` are nicer for lineage, what do you think of it? > > Best, > Shammon FY > > > On Thu, Jun 29, 2023 at 4:21 AM Jing Ge <j...@ververica.com.invalid> wrote: > >> Hi Shammon, >> >> Thanks for your proposal. After reading the FLIP, I'd like to ask >> some questions to make sure we are on the same page. Thanks! >> >> 1. TableColumnLineageRelation#sinkColumn() should return >> TableColumnLineageEntity instead of String, right? >> >> 2. Since LineageRelation already contains all information to build the >> lineage between sources and sink, do we still need to set the LineageEntity >> in the source? >> >> 3. About the "Entity" and "Relation" naming, I was confused too, like >> Qingsheng mentioned. How about LineageVertex, LineageEdge, and LineageEdges >> which contains multiple LineageEdge? E.g. multiple sources join into one >> sink, or, edges of columns from one or different tables, etc. >> >> Best regards, >> Jing >> >> On Sun, Jun 25, 2023 at 2:06 PM Shammon FY <zjur...@gmail.com> wrote: >> >>> Hi yuxia and Yun, >>> >>> Thanks for your input. >>> >>> For yuxia: >>>> 1: What kinds of JobStatus will the `JobExecutionStatusEven` including? >>> >>> At present, we only need to notify the listener when a job goes to >>> termination, but I think it makes sense to add generic `oldStatus` and >>> `newStatus` in the listener and users can update the job state in their >>> service as needed. >>> >>>> 2: I'm really confused about the `config()` included in >> `LineageEntity`, >>> where is it from and what is it for ? >>> >>> The `config` in `LineageEntity` is used for users to get options for >> source >>> and sink connectors. As the examples in the FLIP, users can add >>> server/group/topic information in the config for kafka and create lineage >>> entities for `DataStream` jobs, then the listeners can get this >> information >>> to identify the same connector in different jobs. Otherwise, the `config` >>> in `TableLineageEntity` will be the same as `getOptions` in >>> `CatalogBaseTable`. >>> >>>> 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity` >> is >>> needed or not, since `TableSinkLineageEntity` contains >>> `inputChangelogMode`, why `TableSourceLineageEntity` don't contain >>> changelogmode? >>> >>> At present, we do not actually use the changelog mode. It can be deleted, >>> and I have updated FLIP. >>> >>>> Btw, since there're a lot interfaces proposed, I think it'll be better >> to >>> give an example about how to implement a listener in this FLIP to make us >>> know better about the interfaces. >>> >>> I have added the example in the FLIP and the related interfaces and >>> examples are in branch [1]. >>> >>> For Yun: >>>> I have one more question on the lookup-join dim tables, it seems this >>> FLIP does not touch them, and will them become part of the >>> List<LineageEntity> sources() or adding another interface? >>> >>> You're right, currently lookup join dim tables were not considered in the >>> 'proposed changed' section of this FLIP. But the interface for lineage is >>> universal and we can give `TableLookupSourceLineageEntity` which >> implements >>> `TableSourceLineageEntity` in the future without modifying the public >>> interface. >>> >>>> By the way, if you want to focus on job lineage instead of data column >>> lineage in this FLIP, why we must introduce so many column-lineage >> related >>> interface here? >>> >>> The lineage information in SQL jobs includes table lineage and column >>> lineage. Although SQL jobs currently do not support column lineage, we >>> would like to support this in the next step. So we have comprehensively >>> considered the table lineage and column lineage interfaces here, and >>> defined these two interfaces together clearly >>> >>> >>> [1] >>> >>> >> https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b11e82c9187c >>> >>> Best, >>> Shammon FY >>> >>> >>> On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <myas...@live.com> wrote: >>> >>>> Hi Shammon, >>>> >>>> I like the idea in general and it will help to analysis the job >> lineages >>>> no matter FlinkSQL or Flink jar jobs in production environments. >>>> >>>> For Qingsheng's concern, I'd like the name of JobType more than >>>> RuntimeExecutionMode, as the latter one is not easy to understand for >>> users. >>>> >>>> I have one more question on the lookup-join dim tables, it seems this >>> FLIP >>>> does not touch them, and will them become part of the >> List<LineageEntity> >>>> sources() or adding another interface? >>>> >>>> By the way, if you want to focus on job lineage instead of data column >>>> lineage in this FLIP, why we must introduce so many column-lineage >>> related >>>> interface here? >>>> >>>> >>>> Best >>>> Yun Tang >>>> ________________________________ >>>> From: Shammon FY <zjur...@gmail.com> >>>> Sent: Sunday, June 25, 2023 16:13 >>>> To: dev@flink.apache.org <dev@flink.apache.org> >>>> Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage >> Listener >>>> >>>> Hi Qingsheng, >>>> >>>> Thanks for your valuable feedback. >>>> >>>>> 1. Is there any specific use case to expose the batch / streaming >> info >>> to >>>> listeners or meta services? >>>> >>>> I agree with you that Flink is evolving towards batch-streaming >>>> unification, but the lifecycle of them is different. If a job >> processes a >>>> bound dataset, it will end after completing the data processing, >>> otherwise, >>>> it will run for a long time. In our scenario, we will regularly >> schedule >>>> some Flink jobs to process bound dataset and update some job >> information >>> to >>>> the lineage information for the "batch" jobs such as scheduled >> timestamp, >>>> execution duration when jobs are finished, which is different from >>>> "streaming" jobs. Currently Flink uses `RuntimeExecutionMode` and >>>> `existsUnboundedSource` in `StreamingGraph` and >> `StreamingGraphGenerator` >>>> to determine `JobType` and disjoin jobs. We can mark `JobType` as >>>> `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what >>> do >>>> you think of it? >>>> >>>>> 2. it’s better to be more specific here to tell users what >> information >>>> they could expect to see here, instead of just a “job configuration” as >>>> described in JavaDoc. >>>> >>>> Thanks and I have updated the doc in FLIP. >>>> >>>>> 3. About the IO executor in JobStatusChangedListenerFactory.Context. >>>> >>>> I have updated the docs for io executor in >>>> `JobStatusChangedListenerFactory.Context`, it is a regular thread pool >>> and >>>> executes submitted tasks in parallel. Users can submit tasks to the >>>> executor which ensures that the submitted task can be executed before >> the >>>> job exits. >>>> >>>>> 4. I don’t quite get the LineageRelationEntity, which is just a list >> of >>>> LineageEntity. >>>> >>>> In the initial idea, the `LineageRelationEntity` is used for >> `DataStream` >>>> to set additional lineage information besides source. For example, >> there >>>> are table and column lineages in SQL jobs. When we build a `DataStream` >>> job >>>> with table source and sink, we can add table lineage in the following >>>> method. >>>> ``` >>>> public class DataStreamSink { >>>> public DataStreamSink setLineageSources(LineageEntity ... sources); >>>> } >>>> ``` >>>> But we can not set column lineage for the above sink, and for the sake >> of >>>> universality, we do not want to add a method similar to >> `addLineageColumn >>>> (...)` in `DataStreamSink`. So I put this information into >>>> LineageRelationEntity so that SQL and DataStream jobs can be >> consistent. >>>> But as you mentioned, this approach does indeed lead to ambiguity and >>>> complexity. So my current idea is to add the `setLineageRelation` >> method >>> in >>>> `DataStreamSink` directly without `LineageRelationEntity`, I have >> updated >>>> the FLIP and please help to review it again, thanks. >>>> >>>>> 5. I can’t find the definition of CatalogContext in the current code >>> base >>>> and Flink, which appears in the TableLineageEntity. >>>> >>>> CatalogContext is defined in FLIP-294 and I have updated the FLIP >>>> >>>>> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a >>> boolean >>>> (the “override” is quite confusing). I’m wondering if these are >> necessary >>>> for meta services, as they are actually concepts defined in the runtime >>>> level of Flink Table / SQL. >>>> >>>> The information in `TableSinkLineageEntity` such as `ModifyType`, >>>> `ChangelogMode` and `override` are mainly used for verification and >>>> display. For example, Flink currently supports `INSERT`/`DELETE` and >>>> `UPDATE`, we only want to report and update lineage for `INSERT` jobs >> in >>>> our streaming & batch ETL, and display the `override` information on >> the >>>> UI. >>>> >>>> >>>> Best, >>>> Shammon FY >>>> >>>> >>>> On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> >> wrote: >>>> >>>>> Hi Shammon, >>>>> >>>>> Thanks for starting this FLIP! Data lineage is a very important >> topic, >>>>> which has been missing for a long time in Flink. I have some >> questions >>>>> about the FLIP. >>>>> >>>>> About events and listeners: >>>>> >>>>> 1. I’m not sure if it is necessary to expose JobType to in >>>> JobCreatedEvent. >>>>> This is an internal class in flink-runtime, and I think the correct >> API >>>>> should be RuntimeExecutionMode. Furthermore, I think the boundary of >>>> batch >>>>> and streaming becomes much more vague as Flink is evolving towards >>>>> batch-streaming unification, so I’m concerned about exposing JobType >>> as a >>>>> public API. Is there any specific use case to expose the batch / >>>> streaming >>>>> info to listeners or meta services? >>>>> >>>>> 2. Currently JobCreatedEvent gives a Configuration, which is quite >>>>> ambiguous. To be honest the configuration is quite a mess in Flink, >> so >>>>> maybe it’s better to be more specific here to tell users what >>> information >>>>> they could expect to see here, instead of just a “job configuration” >> as >>>>> described in JavaDoc. >>>>> >>>>> 3. JobStatusChangedListenerFactory.Context provides an IO executor. I >>>> think >>>>> more information should be provided here, such as which thread model >>> this >>>>> executor could promise, and whether the user should care about >>>> concurrency >>>>> issues. Otherwise I prefer not to give such an utility that no one >>> dares >>>> to >>>>> use safely, and leave it to users to choose their implementation. >>>>> >>>>> About lineage: >>>>> >>>>> 4. I don’t quite get the LineageRelationEntity, which is just a list >> of >>>>> LineageEntity. Could you elaborate more on this class? From my naive >>>>> imagination, the lineage is shaped as a DAG, where vertices are >> sources >>>> and >>>>> sinks (LineageEntity) and edges are connections between them >>>>> (LineageRelation), so it is a bit confusing for a name mixing these >> two >>>>> concepts. >>>>> >>>>> 5. I can’t find the definition of CatalogContext in the current code >>> base >>>>> and Flink, which appears in the TableLineageEntity. >>>>> >>>>> 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a >>> boolean >>>>> (the “override” is quite confusing). I’m wondering if these are >>> necessary >>>>> for meta services, as they are actually concepts defined in the >> runtime >>>>> level of Flink Table / SQL. >>>>> >>>>> Best, >>>>> Qingsheng >>>>> >>>>> On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com> >> wrote: >>>>> >>>>>> Hi devs, >>>>>> >>>>>> Is there any comment or feedback for this FLIP? Hope to hear from >>> you, >>>>>> thanks >>>>>> >>>>>> Best, >>>>>> Shammon FY >>>>>> >>>>>> >>>>>> On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com> >> wrote: >>>>>> >>>>>>> Hi devs, >>>>>>> >>>>>>> I would like to start a discussion on FLIP-314: Support >> Customized >>>> Job >>>>>>> Lineage Listener[1] which is the next stage of FLIP-294 [2]. >> Flink >>>>>>> streaming and batch jobs create lineage dependency between source >>> and >>>>>> sink, >>>>>>> users can manage their data and jobs according to this lineage >>>>>> information. >>>>>>> For example, when there is a delay in Flink ETL or data, users >> can >>>>> easily >>>>>>> trace the problematic jobs and affected data. On the other hand, >>> when >>>>>> users >>>>>>> need to correct data or debug, they can perform operations based >> on >>>>>> lineage >>>>>>> too. >>>>>>> >>>>>>> In FLIP-314 we want to introduce lineage related interfaces for >>> Flink >>>>> and >>>>>>> users can create customized job status listeners. When job status >>>>>> changes, >>>>>>> users can get job status and information to add, update or delete >>>>>> lineage. >>>>>>> >>>>>>> Looking forward to your feedback, thanks. >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener >>>>>>> [2] >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener >>>>>>> >>>>>>> Best, >>>>>>> Shammon FY >>>>>>> >>>>>> >>>>> >>>> >>> >>