Hi Xiangyu, Introduce a new sink ability interface named `SupportsTargetColumnUpdate`, > this interface will tell the planner if the sink has considered the target > columns information in its implementation;
xiangyu feng <xiangyu...@gmail.com> 于2025年2月18日周二 15:06写道: > Hi Kevin, > > Thx for ur valuable suggestion. I've made a few changes to current FLIP[1]. > > 1, Introduce a new sink ability interface named > `SupportsTargetColumnUpdate`, this interface will tell the planner if the > sink has considered the target columns information in its implementation; > > 2, This ability will return true by default for safety consideration; > > 3, When generating node digest for sink reuse, the digest will only ignore > the target column infos when this ability return false. This will require > extra work for specific sink. > > By applying these changes, we can safely enable the sink reuse feature by > default even for sinks like JDBC . And for sinks like Paimon, we can also > reuse the sink node across multiple partial-update streams with different > target columns by revising paimon sink to implement this interface. > > Glad to hear you back for these updates. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner > > > > Kevin Cheng <congchengch...@gmail.com> 于2025年2月14日周五 16:13写道: > >> Hi Xiangyu and Ron, >> >> I agree that sink reuse can be enabled by default from Flink Planner >> perspective. But the planner should be informed by Sink Connector that >> whether the planner can reuse different sink with different target >> columns. >> >> Take JBDC sink as an example, under partial update circumstances, the JDBC >> needs to know the target sink or update columns of every record. However, >> the target columns info is discarded in current FLIP design. >> >> Best, >> Xiangyu >> >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月14日周五 13:51写道: >> >> > Hi Ron, >> > >> > After second thought, taking sink reuse as a long awaited feature >> > with significant benefits for users, I agree to enable this in the first >> > place. Similar features like `table.optimizer.reuse-sub-plan-enabled` >> and >> > `table.optimizer.reuse-source-enabled` are also enabled by default. From >> > this point of view, sink reuse should be the same. >> > >> > The Flip[1] has been revised accordingly. Thx for suggestion. >> > >> > [1] >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >> > >> > Regards, >> > Xiangyu >> > >> > >> > >> > >> > Ron Liu <ron9....@gmail.com> 于2025年2月14日周五 12:10写道: >> > >> > > Hi, Xiangyu >> > > >> > > >>> I prefer to set the default value of this option as'false in the >> > first >> > > place. Setting as true might introduce unexpected behavior for users >> > when >> > > operating existing jobs. Maybe we should introduce this feature at >> first >> > > and discuss enabling this feature as default in a separated thread. >> WDYT? >> > > >> > > 1. What unexpected behaviors do you think this might introduce? For >> Sink >> > > nodes, which are generally stateless, I intuitively understand that no >> > > state compatibility issues will be introduced after Sink reuse. >> > > >> > > 2. Since Sink reuse benefits users, why not enable this feature by >> > default >> > > on the first day it is introduced? If your concern is potential >> unhandled >> > > corner cases in the implementation, I consider those to be bugs. We >> > should >> > > prioritize fixing them rather than blocking the default enablement of >> > this >> > > optimization. >> > > >> > > 3. If we don't enable it by default now, when should we? What specific >> > > milestones or actions are needed during the waiting period? Your >> > concerns >> > > about unintended behaviors would still exist even if we enable it >> later. >> > > Why delay resolving this in a separate discussion instead of >> finalizing >> > it >> > > here? >> > > >> > > 4. From our internal practice, users still want to enjoy the benefits >> of >> > > this feature by default. >> > > >> > > >> > > Best, >> > > Ron >> > > >> > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 15:57写道: >> > > >> > > > Hi Ron, >> > > > >> > > > Thx for quick response. >> > > > >> > > > - should the default value be true for the newly introduced option >> > > > `table.optimizer.reuse-sink-enabled`? >> > > > >> > > > I prefer to set the default value of this option as'false in the >> first >> > > > place. Setting as true might introduce unexpected behavior for >> users >> > > when >> > > > operating existing jobs. Maybe we should introduce this feature at >> > first >> > > > and discuss enabling this feature as default in a separated thread. >> > WDYT? >> > > > >> > > > - have you considered the technical implementation options and are >> they >> > > > feasible? >> > > > >> > > > Yes, we have already implemented the POC internally. It works well. >> > > > >> > > > Looking forward for your feedback. >> > > > >> > > > Best, >> > > > Xiangyu >> > > > >> > > > Ron Liu <ron9....@gmail.com> 于2025年2月13日周四 14:55写道: >> > > > >> > > > > Hi, Xiangyu >> > > > > >> > > > > Thank you for proposing this FLIP, it's great work and looks very >> > > useful >> > > > > for users. >> > > > > >> > > > > I have the following two questions regarding the content of the >> FLIP: >> > > > > 1. Since sink reuse is very useful, should the default value be >> true >> > > for >> > > > > the newly introduced option `table.optimizer.reuse-sink-enabled`, >> and >> > > > > should the engine enable this optimization by default. Currently >> for >> > > > source >> > > > > reuse, the default value of >> > `sql.optimizer.reuse.table-source.enabled` >> > > > > option is also true, which does not require user access by >> default, >> > so >> > > I >> > > > > think the engine should turn on Sink reuse optimization by >> default. >> > > > > 2. Regarding Sink Digest, you mentioned disregarding the sink >> target >> > > > > column, which I think is a very good suggestion, and very useful >> if >> > it >> > > > can >> > > > > be done. I have a question: have you considered the technical >> > > > > implementation options and are they feasible? >> > > > > >> > > > > Best, >> > > > > Ron >> > > > > >> > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 12:56写道: >> > > > > >> > > > > > Hi all, >> > > > > > >> > > > > > Thank you all for the comments. >> > > > > > >> > > > > > If there is no further comment, I will open the voting thread >> in 3 >> > > > days. >> > > > > > >> > > > > > Regards, >> > > > > > Xiangyu >> > > > > > >> > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 14:17写道: >> > > > > > >> > > > > > > Link for Paimon LocalMerge Operator[1] >> > > > > > > >> > > > > > > [1] >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://paimon.apache.org/docs/master/maintenance/write-performance/#local-merging >> > > > > > > >> > > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 14:03写道: >> > > > > > > >> > > > > > >> Follow the above, >> > > > > > >> >> > > > > > >> "And for SinkWriter, the data structure to be processed >> should >> > be >> > > > > > fixed." >> > > > > > >> >> > > > > > >> I'm not very sure why the data structure of SinkWriter >> should be >> > > > > fixed. >> > > > > > >> Can you elaborate the scenario here? >> > > > > > >> >> > > > > > >> "Is there a node or an operator to fill in the inconsistent >> > field >> > > > of >> > > > > > >> Rowdata that passed from different Sources?" >> > > > > > >> >> > > > > > >> By `filling in the inconsistent field from different >> sources`, >> > do >> > > > you >> > > > > > >> refer to implementations like the LocalMerge Operator [1] for >> > > > Paimon? >> > > > > > IMHO, >> > > > > > >> this should not be included in the Sink Reuse. The merging >> > > behavior >> > > > of >> > > > > > >> multiple sources should be considered inside of the sink. >> > > > > > >> >> > > > > > >> Regards, >> > > > > > >> Xiangyu Feng >> > > > > > >> >> > > > > > >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 13:46写道: >> > > > > > >> >> > > > > > >>> Hi Yanquan, >> > > > > > >>> >> > > > > > >>> Thx for reply. IIUC, the schema of CatalogTable should >> contain >> > > all >> > > > > > >>> target columns for sources. If not, a SQL validation >> exception >> > > > should >> > > > > > be >> > > > > > >>> raised for planner. >> > > > > > >>> >> > > > > > >>> Regards, >> > > > > > >>> Xiangyu Feng >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> Yanquan Lv <decq12y...@gmail.com> 于2025年2月10日周一 16:25写道: >> > > > > > >>> >> > > > > > >>>> Hi, Xiangyu. Thanks for driving this. >> > > > > > >>>> >> > > > > > >>>> I have a question to confirm: >> > > > > > >>>> Considering the case that different Sources use different >> > > > > columns[1], >> > > > > > >>>> will the Schema of CatalogTable[2] contain all target >> columns >> > > for >> > > > > > Sources? >> > > > > > >>>> And for SinkWriter, the data structure to be processed >> should >> > be >> > > > > > fixed. >> > > > > > >>>> Is there a node or an operator to fill in the inconsistent >> > field >> > > > of >> > > > > > Rowdata >> > > > > > >>>> that passed from different Sources? >> > > > > > >>>> >> > > > > > >>>> [1] >> > > > > > >>>> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >> > > > > > >>>> [2] >> > > > > > >>>> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#planning >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> > 2025年2月6日 17:06,xiangyu feng <xiangyu...@gmail.com> 写道: >> > > > > > >>>> > >> > > > > > >>>> > Hi devs, >> > > > > > >>>> > >> > > > > > >>>> > I'm opening this thread to discuss FLIP-506: Support >> Reuse >> > > > > Multiple >> > > > > > >>>> Table >> > > > > > >>>> > Sinks in Planner[1]. >> > > > > > >>>> > >> > > > > > >>>> > Currently if users want to partial-update a downstream >> table >> > > > from >> > > > > > >>>> multiple >> > > > > > >>>> > source tables in one datastream, they would have to >> manually >> > > > union >> > > > > > all >> > > > > > >>>> > source tables and add lots of "cast(null as string) as >> xxx" >> > in >> > > > > Flink >> > > > > > >>>> SQL. >> > > > > > >>>> > This will make the SQL here hard to use and maintain. >> > > > > > >>>> > >> > > > > > >>>> > After discussing with Weijie Guo, we think that by >> > supporting >> > > > > reuse >> > > > > > >>>> sink >> > > > > > >>>> > nodes in planner, the usability can be greatly improved >> in >> > > this >> > > > > > case. >> > > > > > >>>> > >> > > > > > >>>> > Therefore, we propose to add a new option >> > > > > > >>>> > *`table.optimizer.reuse-sink-enabled`* here to support >> this >> > > > > feature. >> > > > > > >>>> More >> > > > > > >>>> > details can be found in the FLIP. >> > > > > > >>>> > >> > > > > > >>>> > Looking forward to your feedback, thanks. >> > > > > > >>>> > >> > > > > > >>>> > [1] >> > > > > > >>>> > >> > > > > > >>>> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >> > > > > > >>>> > >> > > > > > >>>> > Best regards, >> > > > > > >>>> > Xiangyu Feng >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >> > > > > >> > > > >> > > >> > >> >