Hi Feng, Thanks for your input.
>1. we can add a lineage interface like `supportReportLineage` It's a so good idea and thanks very much. It can help users to report lineage for existing connectors in DataStream jobs without any additional operations. I will give this interface in the FLIP later and please help to review it, thanks >2. it is relatively easy to obtain column lineage through Calcite MetaQuery API It's helpful if Calcite already has some column lineage in meta, I think we can discuss and give the proposal in the column lineage FLIP Best, Shammon FY On Wednesday, June 28, 2023, Feng Jin <jinfeng1...@gmail.com> wrote: > Hi Shammon > Thank you for proposing this FLIP. I think the Flink Job lineage is a very > useful feature. > I have few question: > > 1. For DataStream Jobs, users need to set up lineage relationships when > building DAGs for their custom sources and sinks. > However, for some common connectors such as Kafka Connector and JDBC > Connector, we can add a lineage interface like `supportReportLineage`, so > that these connectors can implement it. > This way, in the scenario of DataStream Jobs, lineages can be automatically > reported. What do you think? > > > 2. From the current design, it seems that we need to analyze column lineage > through pipeline. As far as I know, it is relatively easy to obtain column > lineage through Calcite MetaQuery API. > Would you consider using this approach? Or do we need to implement another > parsing process based on the pipeline? > ``` > RelMetadataQuery metadataQuery = relNode.getCluster().getMetadataQuery(); > metadataQuery.getColumnOrigins(inputRel, i); > ``` > Best, > Feng > > > On Sun, Jun 25, 2023 at 8:06 PM Shammon FY <zjur...@gmail.com> wrote: > > > Hi yuxia and Yun, > > > > Thanks for your input. > > > > For yuxia: > > > 1: What kinds of JobStatus will the `JobExecutionStatusEven` including? > > > > At present, we only need to notify the listener when a job goes to > > termination, but I think it makes sense to add generic `oldStatus` and > > `newStatus` in the listener and users can update the job state in their > > service as needed. > > > > > 2: I'm really confused about the `config()` included in > `LineageEntity`, > > where is it from and what is it for ? > > > > The `config` in `LineageEntity` is used for users to get options for > source > > and sink connectors. As the examples in the FLIP, users can add > > server/group/topic information in the config for kafka and create lineage > > entities for `DataStream` jobs, then the listeners can get this > information > > to identify the same connector in different jobs. Otherwise, the `config` > > in `TableLineageEntity` will be the same as `getOptions` in > > `CatalogBaseTable`. > > > > > 3: Regardless whether `inputChangelogMode` in `TableSinkLineageEntity` > is > > needed or not, since `TableSinkLineageEntity` contains > > `inputChangelogMode`, why `TableSourceLineageEntity` don't contain > > changelogmode? > > > > At present, we do not actually use the changelog mode. It can be deleted, > > and I have updated FLIP. > > > > > Btw, since there're a lot interfaces proposed, I think it'll be better > to > > give an example about how to implement a listener in this FLIP to make us > > know better about the interfaces. > > > > I have added the example in the FLIP and the related interfaces and > > examples are in branch [1]. > > > > For Yun: > > > I have one more question on the lookup-join dim tables, it seems this > > FLIP does not touch them, and will them become part of the > > List<LineageEntity> sources() or adding another interface? > > > > You're right, currently lookup join dim tables were not considered in the > > 'proposed changed' section of this FLIP. But the interface for lineage is > > universal and we can give `TableLookupSourceLineageEntity` which > implements > > `TableSourceLineageEntity` in the future without modifying the public > > interface. > > > > > By the way, if you want to focus on job lineage instead of data column > > lineage in this FLIP, why we must introduce so many column-lineage > related > > interface here? > > > > The lineage information in SQL jobs includes table lineage and column > > lineage. Although SQL jobs currently do not support column lineage, we > > would like to support this in the next step. So we have comprehensively > > considered the table lineage and column lineage interfaces here, and > > defined these two interfaces together clearly > > > > > > [1] > > > > https://github.com/FangYongs/flink/commit/d4bfe57e7a5315b790e79b8acef8b1 > 1e82c9187c > > > > Best, > > Shammon FY > > > > > > On Sun, Jun 25, 2023 at 4:17 PM Yun Tang <myas...@live.com> wrote: > > > > > Hi Shammon, > > > > > > I like the idea in general and it will help to analysis the job > lineages > > > no matter FlinkSQL or Flink jar jobs in production environments. > > > > > > For Qingsheng's concern, I'd like the name of JobType more than > > > RuntimeExecutionMode, as the latter one is not easy to understand for > > users. > > > > > > I have one more question on the lookup-join dim tables, it seems this > > FLIP > > > does not touch them, and will them become part of the > List<LineageEntity> > > > sources() or adding another interface? > > > > > > By the way, if you want to focus on job lineage instead of data column > > > lineage in this FLIP, why we must introduce so many column-lineage > > related > > > interface here? > > > > > > > > > Best > > > Yun Tang > > > ________________________________ > > > From: Shammon FY <zjur...@gmail.com> > > > Sent: Sunday, June 25, 2023 16:13 > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > Subject: Re: [DISCUSS] FLIP-314: Support Customized Job Lineage > Listener > > > > > > Hi Qingsheng, > > > > > > Thanks for your valuable feedback. > > > > > > > 1. Is there any specific use case to expose the batch / streaming > info > > to > > > listeners or meta services? > > > > > > I agree with you that Flink is evolving towards batch-streaming > > > unification, but the lifecycle of them is different. If a job > processes a > > > bound dataset, it will end after completing the data processing, > > otherwise, > > > it will run for a long time. In our scenario, we will regularly > schedule > > > some Flink jobs to process bound dataset and update some job > information > > to > > > the lineage information for the "batch" jobs such as scheduled > timestamp, > > > execution duration when jobs are finished, which is different from > > > "streaming" jobs. Currently Flink uses `RuntimeExecutionMode` and > > > `existsUnboundedSource` in `StreamingGraph` and > `StreamingGraphGenerator` > > > to determine `JobType` and disjoin jobs. We can mark `JobType` as > > > `PublicEvolving` or use `RuntimeExecutionMode` and a boolean flag, what > > do > > > you think of it? > > > > > > > 2. it’s better to be more specific here to tell users what > information > > > they could expect to see here, instead of just a “job configuration” as > > > described in JavaDoc. > > > > > > Thanks and I have updated the doc in FLIP. > > > > > > > 3. About the IO executor in JobStatusChangedListenerFactory.Context. > > > > > > I have updated the docs for io executor in > > > `JobStatusChangedListenerFactory.Context`, it is a regular thread pool > > and > > > executes submitted tasks in parallel. Users can submit tasks to the > > > executor which ensures that the submitted task can be executed before > the > > > job exits. > > > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a list > of > > > LineageEntity. > > > > > > In the initial idea, the `LineageRelationEntity` is used for > `DataStream` > > > to set additional lineage information besides source. For example, > there > > > are table and column lineages in SQL jobs. When we build a `DataStream` > > job > > > with table source and sink, we can add table lineage in the following > > > method. > > > ``` > > > public class DataStreamSink { > > > public DataStreamSink setLineageSources(LineageEntity ... > sources); > > > } > > > ``` > > > But we can not set column lineage for the above sink, and for the sake > of > > > universality, we do not want to add a method similar to > `addLineageColumn > > > (...)` in `DataStreamSink`. So I put this information into > > > LineageRelationEntity so that SQL and DataStream jobs can be > consistent. > > > But as you mentioned, this approach does indeed lead to ambiguity and > > > complexity. So my current idea is to add the `setLineageRelation` > method > > in > > > `DataStreamSink` directly without `LineageRelationEntity`, I have > updated > > > the FLIP and please help to review it again, thanks. > > > > > > > 5. I can’t find the definition of CatalogContext in the current code > > base > > > and Flink, which appears in the TableLineageEntity. > > > > > > CatalogContext is defined in FLIP-294 and I have updated the FLIP > > > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a > > boolean > > > (the “override” is quite confusing). I’m wondering if these are > necessary > > > for meta services, as they are actually concepts defined in the runtime > > > level of Flink Table / SQL. > > > > > > The information in `TableSinkLineageEntity` such as `ModifyType`, > > > `ChangelogMode` and `override` are mainly used for verification and > > > display. For example, Flink currently supports `INSERT`/`DELETE` and > > > `UPDATE`, we only want to report and update lineage for `INSERT` jobs > in > > > our streaming & batch ETL, and display the `override` information on > the > > > UI. > > > > > > > > > Best, > > > Shammon FY > > > > > > > > > On Tue, Jun 20, 2023 at 6:19 PM Qingsheng Ren <re...@apache.org> > wrote: > > > > > > > Hi Shammon, > > > > > > > > Thanks for starting this FLIP! Data lineage is a very important > topic, > > > > which has been missing for a long time in Flink. I have some > questions > > > > about the FLIP. > > > > > > > > About events and listeners: > > > > > > > > 1. I’m not sure if it is necessary to expose JobType to in > > > JobCreatedEvent. > > > > This is an internal class in flink-runtime, and I think the correct > API > > > > should be RuntimeExecutionMode. Furthermore, I think the boundary of > > > batch > > > > and streaming becomes much more vague as Flink is evolving towards > > > > batch-streaming unification, so I’m concerned about exposing JobType > > as a > > > > public API. Is there any specific use case to expose the batch / > > > streaming > > > > info to listeners or meta services? > > > > > > > > 2. Currently JobCreatedEvent gives a Configuration, which is quite > > > > ambiguous. To be honest the configuration is quite a mess in Flink, > so > > > > maybe it’s better to be more specific here to tell users what > > information > > > > they could expect to see here, instead of just a “job configuration” > as > > > > described in JavaDoc. > > > > > > > > 3. JobStatusChangedListenerFactory.Context provides an IO executor. > I > > > think > > > > more information should be provided here, such as which thread model > > this > > > > executor could promise, and whether the user should care about > > > concurrency > > > > issues. Otherwise I prefer not to give such an utility that no one > > dares > > > to > > > > use safely, and leave it to users to choose their implementation. > > > > > > > > About lineage: > > > > > > > > 4. I don’t quite get the LineageRelationEntity, which is just a list > of > > > > LineageEntity. Could you elaborate more on this class? From my naive > > > > imagination, the lineage is shaped as a DAG, where vertices are > sources > > > and > > > > sinks (LineageEntity) and edges are connections between them > > > > (LineageRelation), so it is a bit confusing for a name mixing these > two > > > > concepts. > > > > > > > > 5. I can’t find the definition of CatalogContext in the current code > > base > > > > and Flink, which appears in the TableLineageEntity. > > > > > > > > 6. TableSinkLineageEntity exposes ModifyType, ChangelogMode and a > > boolean > > > > (the “override” is quite confusing). I’m wondering if these are > > necessary > > > > for meta services, as they are actually concepts defined in the > runtime > > > > level of Flink Table / SQL. > > > > > > > > Best, > > > > Qingsheng > > > > > > > > On Tue, Jun 20, 2023 at 2:31 PM Shammon FY <zjur...@gmail.com> > wrote: > > > > > > > > > Hi devs, > > > > > > > > > > Is there any comment or feedback for this FLIP? Hope to hear from > > you, > > > > > thanks > > > > > > > > > > Best, > > > > > Shammon FY > > > > > > > > > > > > > > > On Tue, Jun 6, 2023 at 8:22 PM Shammon FY <zjur...@gmail.com> > wrote: > > > > > > > > > > > Hi devs, > > > > > > > > > > > > I would like to start a discussion on FLIP-314: Support > Customized > > > Job > > > > > > Lineage Listener[1] which is the next stage of FLIP-294 [2]. > Flink > > > > > > streaming and batch jobs create lineage dependency between source > > and > > > > > sink, > > > > > > users can manage their data and jobs according to this lineage > > > > > information. > > > > > > For example, when there is a delay in Flink ETL or data, users > can > > > > easily > > > > > > trace the problematic jobs and affected data. On the other hand, > > when > > > > > users > > > > > > need to correct data or debug, they can perform operations based > on > > > > > lineage > > > > > > too. > > > > > > > > > > > > In FLIP-314 we want to introduce lineage related interfaces for > > Flink > > > > and > > > > > > users can create customized job status listeners. When job status > > > > > changes, > > > > > > users can get job status and information to add, update or delete > > > > > lineage. > > > > > > > > > > > > Looking forward to your feedback, thanks. > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 314%3A+Support+Customized+Job+Lineage+Listener > > > > > > [2] > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 294%3A+Support+Customized+Job+Meta+Data+Listener > > > > > > > > > > > > Best, > > > > > > Shammon FY > > > > > > > > > > > > > > > > > > > > >