Hi all, After offline discussion with lincoln, weijie and ron, we decide to make a few changes to the FLIP[1] to make the new introduced connector ability more understandable.
1, Migrate TargetColumns From `DynamicTableSink.Context` to new Connector ability `SupportsTargetColumnWriting` Currently, the target columns are being placed in the context of `DynamicTableSink` and are offered to sink by one side. Planner does not aware if the sink has used this information or not. In order to make the planner aware of the usage of target column writing, the target column writing should be migrated from sink context to connector abilities. Connector developer should use this new interface `SupportsTargetColumnWriting` to apply target column writing. The source code of API `context#getTargetColumns` will be marked with @Deprecated in 2.1 and removed in 2.2 or after. 2, The digest of sink node will include the target columns in `DynamicTableSink.Context` at first and will be removed later after the getTargetColumns api is removed. Planner will compare the `TargetColumnWritingSpec` instead for sink reuse if it is implemented. Thank you all for the suggestions! I'd like to hear more if there are more comments. If there is no further comment, I will continue the voting thread by the end of this week. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner Regards, Xiangyu Feng weijie guo <guoweijieres...@gmail.com> 于2025年2月25日周二 10:12写道: > Hi, Xiangyu > > Thanks for the update. > +1 for the new interface name. > > Best regards, > > Weijie > > > Ron Liu <ron9....@gmail.com> 于2025年2月24日周一 19:25写道: > >> Hi, Xiangyu >> >> Thanks for pinging me, the interface name looks good to me. >> >> >> Best, >> Ron >> >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月24日周一 17:59写道: >> >>> Hi Lincoln, >>> >>> Thx for suggestion. >>> >>> -- 1st, for the sql example in the Motivation part, why is cast nulls >>> included in the select clause after union all multiple inputs? >>> >>> I do have misplaced the CAST NULLs, revised in the FLIP[1]. >>> >>> -- 2nd, simplifying the multi partial-insert example, what would the >>> equivalent sql look like to the user after applying the optimization >>> provided by this FLIP? >>> ``` >>> INSERT INTO sink(pk, f1) SELECT ... FROM table1; >>> INSERT INTO sink(pk, f2) SELECT ... FROM table2; >>> INSERT INTO sink(pk, f3) SELECT ... FROM table3; >>> ``` >>> >>> Actually, this is already the simplified SQL here. The optimization here >>> is that, with this SQL, user can submit a united datastream job writing to >>> the sink table without concerns for concurrency issues. >>> >>> -- 3rd, for sink digest description in proposed changes, IIUC, it should >>> be `b` for `not include` and `c` for `include`? >>> >>> I think there is an ambiguity for the previous naming of the interface: >>> `SupportsTargetColumnWriting`. The name was originally meaning that the >>> target columns has been **used** in the sink so the sink digest should >>> **include** this information. >>> After second thought, I've changed the naming of this interface to >>> `SupportsTargetColumnReusing` to remove this ambiguity in the FLIP[1]. When >>> this ability is enabled, the sink can support reuse with different target >>> columns which means the sink digest should **not include** this >>> information. >>> >>> @Lincoln, looking forward to hear you back for the response. >>> >>> Also, @Ron Liu @Weijie Guo I would like to hear more from you about this >>> new interface naming. >>> >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >>> >>> Regards, >>> Xiangyu Feng >>> >>> >>> Lincoln Lee <lincoln.8...@gmail.com> 于2025年2月23日周日 22:43写道: >>> >>>> Hi xiangyu, >>>> >>>> Sorry for my late reply! I have some questions for the FLIP: >>>> >>>> 1st, for the sql example in the Motivation part, why is cast nulls >>>> included >>>> in >>>> the select clause after union all multiple inputs? Related to the >>>> partial-insert >>>> example later, should the cast nulls be in the select clause inside the >>>> union all? >>>> ``` >>>> -- Flink SQL >>>> INSERT INTO sink >>>> SELECT >>>> id1, >>>> CAST(NULL AS STRING) AS f1, >>>> CAST(NULL AS STRING) AS f2, >>>> CAST(NULL AS STRING) AS f3, >>>> CAST(NULL AS STRING) AS f4, >>>> CAST(NULL AS STRING) AS f5, >>>> CAST(NULL AS STRING) AS f6, >>>> CAST(NULL AS STRING) AS f7, >>>> CAST(NULL AS STRING) AS f8, >>>> CAST(NULL AS STRING) AS f9, >>>> ... ... >>>> FROM ( >>>> SELECT ... ... >>>> FROM table1 >>>> UNION ALL >>>> SELECT ... ... >>>> FROM table2 >>>> ``` >>>> >>>> 2nd, simplifying the multi partial-insert example, what would the >>>> equivalent >>>> sql look like to the user after applying the optimization provided by >>>> this >>>> FLIP? >>>> ``` >>>> INSERT INTO sink(pk, f1) SELECT ... FROM table1; >>>> INSERT INTO sink(pk, f2) SELECT ... FROM table2; >>>> INSERT INTO sink(pk, f3) SELECT ... FROM table3; >>>> ``` >>>> >>>> 3rd, for sink digest description in proposed changes, IIUC, it should be >>>> `b` for `not include` and `c` for `include`? >>>> ``` >>>> >>>> Factors *considered* for sink node digest depends on circumstance: >>>> >>>> 1. sink target columns >>>> 1. The sink node digest will* include* the target columns if the sink >>>> has not implement the target column writing ability interface. >>>> 2. The sink node digest will* include* the target columns when the >>>> sink has enabled the target column writing ability >>>> 3. The sink node digest will* not include* the target columns when >>>> the sink has not enabled the target column writing ability >>>> >>>> ``` >>>> >>>> >>>> Best, >>>> Lincoln Lee >>>> >>>> >>>> xiangyu feng <xiangyu...@gmail.com> 于2025年2月20日周四 09:41写道: >>>> >>>> > Hi all, >>>> > >>>> > Thank you all for the comments. >>>> > >>>> > If there is no further comment, I will open the voting thread in 3 >>>> days. >>>> > >>>> > Regards, >>>> > Xiangyu >>>> > >>>> > >>>> > Ron Liu <ron9....@gmail.com>于2025年2月19日 周三17:58写道: >>>> > >>>> > > Hi, Xiangyu >>>> > > >>>> > > Thaks for updates, LGTM >>>> > > >>>> > > Best, >>>> > > Ron >>>> > > >>>> > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月19日周三 17:13写道: >>>> > > >>>> > >> Hi Ron, >>>> > >> >>>> > >> Thx for ur advice. I've made the changes to current FLIP[1] >>>> including >>>> > >> renaming the interface and remove the default implementation. As >>>> we have >>>> > >> discussed, the target columns will be compared in sink reuse if >>>> the sink >>>> > >> has not implemented the `SupportsTargetColumnWriting` ability. >>>> This will >>>> > >> make sure the sink reuse feature can still be safely enabled by >>>> default. >>>> > >> >>>> > >> [1] >>>> > >> >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >>>> > >> >>>> > >> Best, >>>> > >> Xiangyu Feng >>>> > >> >>>> > >> Ron Liu <ron9....@gmail.com> 于2025年2月19日周三 10:25写道: >>>> > >> >>>> > >>> Hi Xiangyu, >>>> > >>> >>>> > >>> Thanks for your reply, the updates LGTM overall. >>>> > >>> >>>> > >>> 1. Regarding the naming of the interface, what do you think about >>>> > >>> calling it SupportsTargetColumnWriting? Here I would like to >>>> emphasize >>>> > the >>>> > >>> support for partial column writing, and I personally think the >>>> naming >>>> > can >>>> > >>> be aligned with SupportsWritingMetadata. >>>> > >>> >>>> > >>> 2. Regarding the interface methods, is it necessary to provide a >>>> > default >>>> > >>> implementation, do most of the stores support partial column >>>> writing? >>>> > >>> >>>> > >>> >>>> > >>> Best, >>>> > >>> Ron >>>> > >>> >>>> > >>> Cong Cheng <congchengch...@gmail.com> 于2025年2月18日周二 16:12写道: >>>> > >>> >>>> > >>>> Hi Xiangyu, >>>> > >>>> >>>> > >>>> Introduce a new sink ability interface named >>>> > >>>> `SupportsTargetColumnUpdate`, >>>> > >>>> > this interface will tell the planner if the sink has >>>> considered the >>>> > >>>> target >>>> > >>>> > columns information in its implementation; >>>> > >>>> >>>> > >>>> >>>> > >>>> I think it makes a lot of sense, +1 for this ability. >>>> > >>>> >>>> > >>>> Sorry to all that I sended the draft of the content twice, >>>> something >>>> > >>>> wrong with the enter of my keyboard. >>>> > >>>> >>>> > >>>> Best, >>>> > >>>> Cong Cheng >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> >>>> > >>>> xiangyu feng <xiangyu...@gmail.com> 于2025年2月18日周二 15:06写道: >>>> > >>>> >>>> > >>>> > Hi Kevin, >>>> > >>>> > >>>> > >>>> > Thx for ur valuable suggestion. I've made a few changes to >>>> current >>>> > >>>> FLIP[1]. >>>> > >>>> > >>>> > >>>> > 1, Introduce a new sink ability interface named >>>> > >>>> > `SupportsTargetColumnUpdate`, this interface will tell the >>>> planner >>>> > if >>>> > >>>> the >>>> > >>>> > sink has considered the target columns information in its >>>> > >>>> implementation; >>>> > >>>> > >>>> > >>>> > 2, This ability will return true by default for safety >>>> > consideration; >>>> > >>>> > >>>> > >>>> > 3, When generating node digest for sink reuse, the digest will >>>> only >>>> > >>>> ignore >>>> > >>>> > the target column infos when this ability return false. This >>>> will >>>> > >>>> require >>>> > >>>> > extra work for specific sink. >>>> > >>>> > >>>> > >>>> > By applying these changes, we can safely enable the sink reuse >>>> > >>>> feature by >>>> > >>>> > default even for sinks like JDBC . And for sinks like Paimon, >>>> we can >>>> > >>>> also >>>> > >>>> > reuse the sink node across multiple partial-update streams with >>>> > >>>> different >>>> > >>>> > target columns by revising paimon sink to implement this >>>> interface. >>>> > >>>> > >>>> > >>>> > Glad to hear you back for these updates. >>>> > >>>> > >>>> > >>>> > [1] >>>> > >>>> > >>>> > >>>> >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > Kevin Cheng <congchengch...@gmail.com> 于2025年2月14日周五 16:13写道: >>>> > >>>> > >>>> > >>>> >> Hi Xiangyu and Ron, >>>> > >>>> >> >>>> > >>>> >> I agree that sink reuse can be enabled by default from Flink >>>> > Planner >>>> > >>>> >> perspective. But the planner should be informed by Sink >>>> Connector >>>> > >>>> that >>>> > >>>> >> whether the planner can reuse different sink with different >>>> target >>>> > >>>> >> columns. >>>> > >>>> >> >>>> > >>>> >> Take JBDC sink as an example, under partial update >>>> circumstances, >>>> > >>>> the JDBC >>>> > >>>> >> needs to know the target sink or update columns of every >>>> record. >>>> > >>>> However, >>>> > >>>> >> the target columns info is discarded in current FLIP design. >>>> > >>>> >> >>>> > >>>> >> Best, >>>> > >>>> >> Xiangyu >>>> > >>>> >> >>>> > >>>> >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月14日周五 13:51写道: >>>> > >>>> >> >>>> > >>>> >> > Hi Ron, >>>> > >>>> >> > >>>> > >>>> >> > After second thought, taking sink reuse as a long awaited >>>> feature >>>> > >>>> >> > with significant benefits for users, I agree to enable this >>>> in >>>> > the >>>> > >>>> first >>>> > >>>> >> > place. Similar features like >>>> > >>>> `table.optimizer.reuse-sub-plan-enabled` >>>> > >>>> >> and >>>> > >>>> >> > `table.optimizer.reuse-source-enabled` are also enabled by >>>> > >>>> default. From >>>> > >>>> >> > this point of view, sink reuse should be the same. >>>> > >>>> >> > >>>> > >>>> >> > The Flip[1] has been revised accordingly. Thx for >>>> suggestion. >>>> > >>>> >> > >>>> > >>>> >> > [1] >>>> > >>>> >> > >>>> > >>>> >> > >>>> > >>>> >> >>>> > >>>> >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >>>> > >>>> >> > >>>> > >>>> >> > Regards, >>>> > >>>> >> > Xiangyu >>>> > >>>> >> > >>>> > >>>> >> > >>>> > >>>> >> > >>>> > >>>> >> > >>>> > >>>> >> > Ron Liu <ron9....@gmail.com> 于2025年2月14日周五 12:10写道: >>>> > >>>> >> > >>>> > >>>> >> > > Hi, Xiangyu >>>> > >>>> >> > > >>>> > >>>> >> > > >>> I prefer to set the default value of this option >>>> as'false >>>> > in >>>> > >>>> the >>>> > >>>> >> > first >>>> > >>>> >> > > place. Setting as true might introduce unexpected >>>> behavior for >>>> > >>>> users >>>> > >>>> >> > when >>>> > >>>> >> > > operating existing jobs. Maybe we should introduce this >>>> feature >>>> > >>>> at >>>> > >>>> >> first >>>> > >>>> >> > > and discuss enabling this feature as default in a >>>> separated >>>> > >>>> thread. >>>> > >>>> >> WDYT? >>>> > >>>> >> > > >>>> > >>>> >> > > 1. What unexpected behaviors do you think this might >>>> introduce? >>>> > >>>> For >>>> > >>>> >> Sink >>>> > >>>> >> > > nodes, which are generally stateless, I intuitively >>>> understand >>>> > >>>> that no >>>> > >>>> >> > > state compatibility issues will be introduced after Sink >>>> reuse. >>>> > >>>> >> > > >>>> > >>>> >> > > 2. Since Sink reuse benefits users, why not enable this >>>> feature >>>> > >>>> by >>>> > >>>> >> > default >>>> > >>>> >> > > on the first day it is introduced? If your concern is >>>> potential >>>> > >>>> >> unhandled >>>> > >>>> >> > > corner cases in the implementation, I consider those to be >>>> > bugs. >>>> > >>>> We >>>> > >>>> >> > should >>>> > >>>> >> > > prioritize fixing them rather than blocking the default >>>> > >>>> enablement of >>>> > >>>> >> > this >>>> > >>>> >> > > optimization. >>>> > >>>> >> > > >>>> > >>>> >> > > 3. If we don't enable it by default now, when should we? >>>> What >>>> > >>>> specific >>>> > >>>> >> > > milestones or actions are needed during the waiting >>>> period? >>>> > Your >>>> > >>>> >> > concerns >>>> > >>>> >> > > about unintended behaviors would still exist even if we >>>> enable >>>> > it >>>> > >>>> >> later. >>>> > >>>> >> > > Why delay resolving this in a separate discussion instead >>>> of >>>> > >>>> >> finalizing >>>> > >>>> >> > it >>>> > >>>> >> > > here? >>>> > >>>> >> > > >>>> > >>>> >> > > 4. From our internal practice, users still want to enjoy >>>> the >>>> > >>>> benefits >>>> > >>>> >> of >>>> > >>>> >> > > this feature by default. >>>> > >>>> >> > > >>>> > >>>> >> > > >>>> > >>>> >> > > Best, >>>> > >>>> >> > > Ron >>>> > >>>> >> > > >>>> > >>>> >> > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 >>>> 15:57写道: >>>> > >>>> >> > > >>>> > >>>> >> > > > Hi Ron, >>>> > >>>> >> > > > >>>> > >>>> >> > > > Thx for quick response. >>>> > >>>> >> > > > >>>> > >>>> >> > > > - should the default value be true for the newly >>>> introduced >>>> > >>>> option >>>> > >>>> >> > > > `table.optimizer.reuse-sink-enabled`? >>>> > >>>> >> > > > >>>> > >>>> >> > > > I prefer to set the default value of this option >>>> as'false in >>>> > >>>> the >>>> > >>>> >> first >>>> > >>>> >> > > > place. Setting as true might introduce unexpected >>>> behavior >>>> > for >>>> > >>>> >> users >>>> > >>>> >> > > when >>>> > >>>> >> > > > operating existing jobs. Maybe we should introduce this >>>> > >>>> feature at >>>> > >>>> >> > first >>>> > >>>> >> > > > and discuss enabling this feature as default in a >>>> separated >>>> > >>>> thread. >>>> > >>>> >> > WDYT? >>>> > >>>> >> > > > >>>> > >>>> >> > > > - have you considered the technical implementation >>>> options >>>> > and >>>> > >>>> are >>>> > >>>> >> they >>>> > >>>> >> > > > feasible? >>>> > >>>> >> > > > >>>> > >>>> >> > > > Yes, we have already implemented the POC internally. It >>>> works >>>> > >>>> well. >>>> > >>>> >> > > > >>>> > >>>> >> > > > Looking forward for your feedback. >>>> > >>>> >> > > > >>>> > >>>> >> > > > Best, >>>> > >>>> >> > > > Xiangyu >>>> > >>>> >> > > > >>>> > >>>> >> > > > Ron Liu <ron9....@gmail.com> 于2025年2月13日周四 14:55写道: >>>> > >>>> >> > > > >>>> > >>>> >> > > > > Hi, Xiangyu >>>> > >>>> >> > > > > >>>> > >>>> >> > > > > Thank you for proposing this FLIP, it's great work and >>>> > looks >>>> > >>>> very >>>> > >>>> >> > > useful >>>> > >>>> >> > > > > for users. >>>> > >>>> >> > > > > >>>> > >>>> >> > > > > I have the following two questions regarding the >>>> content of >>>> > >>>> the >>>> > >>>> >> FLIP: >>>> > >>>> >> > > > > 1. Since sink reuse is very useful, should the default >>>> > value >>>> > >>>> be >>>> > >>>> >> true >>>> > >>>> >> > > for >>>> > >>>> >> > > > > the newly introduced option >>>> > >>>> `table.optimizer.reuse-sink-enabled`, >>>> > >>>> >> and >>>> > >>>> >> > > > > should the engine enable this optimization by default. >>>> > >>>> Currently >>>> > >>>> >> for >>>> > >>>> >> > > > source >>>> > >>>> >> > > > > reuse, the default value of >>>> > >>>> >> > `sql.optimizer.reuse.table-source.enabled` >>>> > >>>> >> > > > > option is also true, which does not require user >>>> access by >>>> > >>>> >> default, >>>> > >>>> >> > so >>>> > >>>> >> > > I >>>> > >>>> >> > > > > think the engine should turn on Sink reuse >>>> optimization by >>>> > >>>> >> default. >>>> > >>>> >> > > > > 2. Regarding Sink Digest, you mentioned disregarding >>>> the >>>> > sink >>>> > >>>> >> target >>>> > >>>> >> > > > > column, which I think is a very good suggestion, and >>>> very >>>> > >>>> useful >>>> > >>>> >> if >>>> > >>>> >> > it >>>> > >>>> >> > > > can >>>> > >>>> >> > > > > be done. I have a question: have you considered the >>>> > technical >>>> > >>>> >> > > > > implementation options and are they feasible? >>>> > >>>> >> > > > > >>>> > >>>> >> > > > > Best, >>>> > >>>> >> > > > > Ron >>>> > >>>> >> > > > > >>>> > >>>> >> > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 >>>> 12:56写道: >>>> > >>>> >> > > > > >>>> > >>>> >> > > > > > Hi all, >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > > Thank you all for the comments. >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > > If there is no further comment, I will open the >>>> voting >>>> > >>>> thread >>>> > >>>> >> in 3 >>>> > >>>> >> > > > days. >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > > Regards, >>>> > >>>> >> > > > > > Xiangyu >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 >>>> > 14:17写道: >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > > > Link for Paimon LocalMerge Operator[1] >>>> > >>>> >> > > > > > > >>>> > >>>> >> > > > > > > [1] >>>> > >>>> >> > > > > > > >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > >>>> > >>>> >> > > > >>>> > >>>> >> > > >>>> > >>>> >> > >>>> > >>>> >> >>>> > >>>> >>>> > >>>> https://paimon.apache.org/docs/master/maintenance/write-performance/#local-merging >>>> > >>>> >> > > > > > > >>>> > >>>> >> > > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 >>>> > >>>> 14:03写道: >>>> > >>>> >> > > > > > > >>>> > >>>> >> > > > > > >> Follow the above, >>>> > >>>> >> > > > > > >> >>>> > >>>> >> > > > > > >> "And for SinkWriter, the data structure to be >>>> > processed >>>> > >>>> >> should >>>> > >>>> >> > be >>>> > >>>> >> > > > > > fixed." >>>> > >>>> >> > > > > > >> >>>> > >>>> >> > > > > > >> I'm not very sure why the data structure of >>>> SinkWriter >>>> > >>>> >> should be >>>> > >>>> >> > > > > fixed. >>>> > >>>> >> > > > > > >> Can you elaborate the scenario here? >>>> > >>>> >> > > > > > >> >>>> > >>>> >> > > > > > >> "Is there a node or an operator to fill in the >>>> > >>>> inconsistent >>>> > >>>> >> > field >>>> > >>>> >> > > > of >>>> > >>>> >> > > > > > >> Rowdata that passed from different Sources?" >>>> > >>>> >> > > > > > >> >>>> > >>>> >> > > > > > >> By `filling in the inconsistent field from >>>> different >>>> > >>>> >> sources`, >>>> > >>>> >> > do >>>> > >>>> >> > > > you >>>> > >>>> >> > > > > > >> refer to implementations like the LocalMerge >>>> Operator >>>> > >>>> [1] for >>>> > >>>> >> > > > Paimon? >>>> > >>>> >> > > > > > IMHO, >>>> > >>>> >> > > > > > >> this should not be included in the Sink Reuse. >>>> The >>>> > >>>> merging >>>> > >>>> >> > > behavior >>>> > >>>> >> > > > of >>>> > >>>> >> > > > > > >> multiple sources should be considered inside of >>>> the >>>> > >>>> sink. >>>> > >>>> >> > > > > > >> >>>> > >>>> >> > > > > > >> Regards, >>>> > >>>> >> > > > > > >> Xiangyu Feng >>>> > >>>> >> > > > > > >> >>>> > >>>> >> > > > > > >> xiangyu feng <xiangyu...@gmail.com> >>>> 于2025年2月11日周二 >>>> > >>>> 13:46写道: >>>> > >>>> >> > > > > > >> >>>> > >>>> >> > > > > > >>> Hi Yanquan, >>>> > >>>> >> > > > > > >>> >>>> > >>>> >> > > > > > >>> Thx for reply. IIUC, the schema of CatalogTable >>>> > should >>>> > >>>> >> contain >>>> > >>>> >> > > all >>>> > >>>> >> > > > > > >>> target columns for sources. If not, a SQL >>>> validation >>>> > >>>> >> exception >>>> > >>>> >> > > > should >>>> > >>>> >> > > > > > be >>>> > >>>> >> > > > > > >>> raised for planner. >>>> > >>>> >> > > > > > >>> >>>> > >>>> >> > > > > > >>> Regards, >>>> > >>>> >> > > > > > >>> Xiangyu Feng >>>> > >>>> >> > > > > > >>> >>>> > >>>> >> > > > > > >>> >>>> > >>>> >> > > > > > >>> >>>> > >>>> >> > > > > > >>> Yanquan Lv <decq12y...@gmail.com> 于2025年2月10日周一 >>>> > >>>> 16:25写道: >>>> > >>>> >> > > > > > >>> >>>> > >>>> >> > > > > > >>>> Hi, Xiangyu. Thanks for driving this. >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> I have a question to confirm: >>>> > >>>> >> > > > > > >>>> Considering the case that different Sources use >>>> > >>>> different >>>> > >>>> >> > > > > columns[1], >>>> > >>>> >> > > > > > >>>> will the Schema of CatalogTable[2] contain all >>>> > target >>>> > >>>> >> columns >>>> > >>>> >> > > for >>>> > >>>> >> > > > > > Sources? >>>> > >>>> >> > > > > > >>>> And for SinkWriter, the data structure to be >>>> > processed >>>> > >>>> >> should >>>> > >>>> >> > be >>>> > >>>> >> > > > > > fixed. >>>> > >>>> >> > > > > > >>>> Is there a node or an operator to fill in the >>>> > >>>> inconsistent >>>> > >>>> >> > field >>>> > >>>> >> > > > of >>>> > >>>> >> > > > > > Rowdata >>>> > >>>> >> > > > > > >>>> that passed from different Sources? >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> [1] >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > >>>> > >>>> >> > > > >>>> > >>>> >> > > >>>> > >>>> >> > >>>> > >>>> >> >>>> > >>>> >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >>>> > >>>> >> > > > > > >>>> [2] >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > >>>> > >>>> >> > > > >>>> > >>>> >> > > >>>> > >>>> >> > >>>> > >>>> >> >>>> > >>>> >>>> > >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#planning >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> > 2025年2月6日 17:06,xiangyu feng < >>>> > xiangyu...@gmail.com> >>>> > >>>> 写道: >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > Hi devs, >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > I'm opening this thread to discuss FLIP-506: >>>> > Support >>>> > >>>> >> Reuse >>>> > >>>> >> > > > > Multiple >>>> > >>>> >> > > > > > >>>> Table >>>> > >>>> >> > > > > > >>>> > Sinks in Planner[1]. >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > Currently if users want to partial-update a >>>> > >>>> downstream >>>> > >>>> >> table >>>> > >>>> >> > > > from >>>> > >>>> >> > > > > > >>>> multiple >>>> > >>>> >> > > > > > >>>> > source tables in one datastream, they would >>>> have >>>> > to >>>> > >>>> >> manually >>>> > >>>> >> > > > union >>>> > >>>> >> > > > > > all >>>> > >>>> >> > > > > > >>>> > source tables and add lots of "cast(null as >>>> > string) >>>> > >>>> as >>>> > >>>> >> xxx" >>>> > >>>> >> > in >>>> > >>>> >> > > > > Flink >>>> > >>>> >> > > > > > >>>> SQL. >>>> > >>>> >> > > > > > >>>> > This will make the SQL here hard to use and >>>> > >>>> maintain. >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > After discussing with Weijie Guo, we think >>>> that by >>>> > >>>> >> > supporting >>>> > >>>> >> > > > > reuse >>>> > >>>> >> > > > > > >>>> sink >>>> > >>>> >> > > > > > >>>> > nodes in planner, the usability can be >>>> greatly >>>> > >>>> improved >>>> > >>>> >> in >>>> > >>>> >> > > this >>>> > >>>> >> > > > > > case. >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > Therefore, we propose to add a new option >>>> > >>>> >> > > > > > >>>> > *`table.optimizer.reuse-sink-enabled`* here >>>> to >>>> > >>>> support >>>> > >>>> >> this >>>> > >>>> >> > > > > feature. >>>> > >>>> >> > > > > > >>>> More >>>> > >>>> >> > > > > > >>>> > details can be found in the FLIP. >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > Looking forward to your feedback, thanks. >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > [1] >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > >>>> > >>>> >> > > > >>>> > >>>> >> > > >>>> > >>>> >> > >>>> > >>>> >> >>>> > >>>> >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >>>> > >>>> >> > > > > > >>>> > >>>> > >>>> >> > > > > > >>>> > Best regards, >>>> > >>>> >> > > > > > >>>> > Xiangyu Feng >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> >>>> > >>>> >> > > > > > >>>> > >>>> >> > > > > >>>> > >>>> >> > > > >>>> > >>>> >> > > >>>> > >>>> >> > >>>> > >>>> >> >>>> > >>>> > >>>> > >>>> >>>> > >>> >>>> > >>>> >>>