Hi Ron, Thx for ur advice. I've made the changes to current FLIP[1] including renaming the interface and remove the default implementation. As we have discussed, the target columns will be compared in sink reuse if the sink has not implemented the `SupportsTargetColumnWriting` ability. This will make sure the sink reuse feature can still be safely enabled by default.
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner Best, Xiangyu Feng Ron Liu <ron9....@gmail.com> 于2025年2月19日周三 10:25写道: > Hi Xiangyu, > > Thanks for your reply, the updates LGTM overall. > > 1. Regarding the naming of the interface, what do you think about calling > it SupportsTargetColumnWriting? Here I would like to emphasize the support > for partial column writing, and I personally think the naming can be > aligned with SupportsWritingMetadata. > > 2. Regarding the interface methods, is it necessary to provide a default > implementation, do most of the stores support partial column writing? > > > Best, > Ron > > Cong Cheng <congchengch...@gmail.com> 于2025年2月18日周二 16:12写道: > >> Hi Xiangyu, >> >> Introduce a new sink ability interface named `SupportsTargetColumnUpdate`, >> > this interface will tell the planner if the sink has considered the >> target >> > columns information in its implementation; >> >> >> I think it makes a lot of sense, +1 for this ability. >> >> Sorry to all that I sended the draft of the content twice, something >> wrong with the enter of my keyboard. >> >> Best, >> Cong Cheng >> >> >> >> >> >> >> >> >> >> >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月18日周二 15:06写道: >> >> > Hi Kevin, >> > >> > Thx for ur valuable suggestion. I've made a few changes to current >> FLIP[1]. >> > >> > 1, Introduce a new sink ability interface named >> > `SupportsTargetColumnUpdate`, this interface will tell the planner if >> the >> > sink has considered the target columns information in its >> implementation; >> > >> > 2, This ability will return true by default for safety consideration; >> > >> > 3, When generating node digest for sink reuse, the digest will only >> ignore >> > the target column infos when this ability return false. This will >> require >> > extra work for specific sink. >> > >> > By applying these changes, we can safely enable the sink reuse feature >> by >> > default even for sinks like JDBC . And for sinks like Paimon, we can >> also >> > reuse the sink node across multiple partial-update streams with >> different >> > target columns by revising paimon sink to implement this interface. >> > >> > Glad to hear you back for these updates. >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >> > >> > >> > >> > Kevin Cheng <congchengch...@gmail.com> 于2025年2月14日周五 16:13写道: >> > >> >> Hi Xiangyu and Ron, >> >> >> >> I agree that sink reuse can be enabled by default from Flink Planner >> >> perspective. But the planner should be informed by Sink Connector that >> >> whether the planner can reuse different sink with different target >> >> columns. >> >> >> >> Take JBDC sink as an example, under partial update circumstances, the >> JDBC >> >> needs to know the target sink or update columns of every record. >> However, >> >> the target columns info is discarded in current FLIP design. >> >> >> >> Best, >> >> Xiangyu >> >> >> >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月14日周五 13:51写道: >> >> >> >> > Hi Ron, >> >> > >> >> > After second thought, taking sink reuse as a long awaited feature >> >> > with significant benefits for users, I agree to enable this in the >> first >> >> > place. Similar features like >> `table.optimizer.reuse-sub-plan-enabled` >> >> and >> >> > `table.optimizer.reuse-source-enabled` are also enabled by default. >> From >> >> > this point of view, sink reuse should be the same. >> >> > >> >> > The Flip[1] has been revised accordingly. Thx for suggestion. >> >> > >> >> > [1] >> >> > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >> >> > >> >> > Regards, >> >> > Xiangyu >> >> > >> >> > >> >> > >> >> > >> >> > Ron Liu <ron9....@gmail.com> 于2025年2月14日周五 12:10写道: >> >> > >> >> > > Hi, Xiangyu >> >> > > >> >> > > >>> I prefer to set the default value of this option as'false in >> the >> >> > first >> >> > > place. Setting as true might introduce unexpected behavior for >> users >> >> > when >> >> > > operating existing jobs. Maybe we should introduce this feature at >> >> first >> >> > > and discuss enabling this feature as default in a separated thread. >> >> WDYT? >> >> > > >> >> > > 1. What unexpected behaviors do you think this might introduce? >> For >> >> Sink >> >> > > nodes, which are generally stateless, I intuitively understand >> that no >> >> > > state compatibility issues will be introduced after Sink reuse. >> >> > > >> >> > > 2. Since Sink reuse benefits users, why not enable this feature by >> >> > default >> >> > > on the first day it is introduced? If your concern is potential >> >> unhandled >> >> > > corner cases in the implementation, I consider those to be bugs. We >> >> > should >> >> > > prioritize fixing them rather than blocking the default enablement >> of >> >> > this >> >> > > optimization. >> >> > > >> >> > > 3. If we don't enable it by default now, when should we? What >> specific >> >> > > milestones or actions are needed during the waiting period? Your >> >> > concerns >> >> > > about unintended behaviors would still exist even if we enable it >> >> later. >> >> > > Why delay resolving this in a separate discussion instead of >> >> finalizing >> >> > it >> >> > > here? >> >> > > >> >> > > 4. From our internal practice, users still want to enjoy the >> benefits >> >> of >> >> > > this feature by default. >> >> > > >> >> > > >> >> > > Best, >> >> > > Ron >> >> > > >> >> > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 15:57写道: >> >> > > >> >> > > > Hi Ron, >> >> > > > >> >> > > > Thx for quick response. >> >> > > > >> >> > > > - should the default value be true for the newly introduced >> option >> >> > > > `table.optimizer.reuse-sink-enabled`? >> >> > > > >> >> > > > I prefer to set the default value of this option as'false in the >> >> first >> >> > > > place. Setting as true might introduce unexpected behavior for >> >> users >> >> > > when >> >> > > > operating existing jobs. Maybe we should introduce this feature >> at >> >> > first >> >> > > > and discuss enabling this feature as default in a separated >> thread. >> >> > WDYT? >> >> > > > >> >> > > > - have you considered the technical implementation options and >> are >> >> they >> >> > > > feasible? >> >> > > > >> >> > > > Yes, we have already implemented the POC internally. It works >> well. >> >> > > > >> >> > > > Looking forward for your feedback. >> >> > > > >> >> > > > Best, >> >> > > > Xiangyu >> >> > > > >> >> > > > Ron Liu <ron9....@gmail.com> 于2025年2月13日周四 14:55写道: >> >> > > > >> >> > > > > Hi, Xiangyu >> >> > > > > >> >> > > > > Thank you for proposing this FLIP, it's great work and looks >> very >> >> > > useful >> >> > > > > for users. >> >> > > > > >> >> > > > > I have the following two questions regarding the content of the >> >> FLIP: >> >> > > > > 1. Since sink reuse is very useful, should the default value be >> >> true >> >> > > for >> >> > > > > the newly introduced option >> `table.optimizer.reuse-sink-enabled`, >> >> and >> >> > > > > should the engine enable this optimization by default. >> Currently >> >> for >> >> > > > source >> >> > > > > reuse, the default value of >> >> > `sql.optimizer.reuse.table-source.enabled` >> >> > > > > option is also true, which does not require user access by >> >> default, >> >> > so >> >> > > I >> >> > > > > think the engine should turn on Sink reuse optimization by >> >> default. >> >> > > > > 2. Regarding Sink Digest, you mentioned disregarding the sink >> >> target >> >> > > > > column, which I think is a very good suggestion, and very >> useful >> >> if >> >> > it >> >> > > > can >> >> > > > > be done. I have a question: have you considered the technical >> >> > > > > implementation options and are they feasible? >> >> > > > > >> >> > > > > Best, >> >> > > > > Ron >> >> > > > > >> >> > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 12:56写道: >> >> > > > > >> >> > > > > > Hi all, >> >> > > > > > >> >> > > > > > Thank you all for the comments. >> >> > > > > > >> >> > > > > > If there is no further comment, I will open the voting thread >> >> in 3 >> >> > > > days. >> >> > > > > > >> >> > > > > > Regards, >> >> > > > > > Xiangyu >> >> > > > > > >> >> > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 14:17写道: >> >> > > > > > >> >> > > > > > > Link for Paimon LocalMerge Operator[1] >> >> > > > > > > >> >> > > > > > > [1] >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://paimon.apache.org/docs/master/maintenance/write-performance/#local-merging >> >> > > > > > > >> >> > > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 14:03写道: >> >> > > > > > > >> >> > > > > > >> Follow the above, >> >> > > > > > >> >> >> > > > > > >> "And for SinkWriter, the data structure to be processed >> >> should >> >> > be >> >> > > > > > fixed." >> >> > > > > > >> >> >> > > > > > >> I'm not very sure why the data structure of SinkWriter >> >> should be >> >> > > > > fixed. >> >> > > > > > >> Can you elaborate the scenario here? >> >> > > > > > >> >> >> > > > > > >> "Is there a node or an operator to fill in the >> inconsistent >> >> > field >> >> > > > of >> >> > > > > > >> Rowdata that passed from different Sources?" >> >> > > > > > >> >> >> > > > > > >> By `filling in the inconsistent field from different >> >> sources`, >> >> > do >> >> > > > you >> >> > > > > > >> refer to implementations like the LocalMerge Operator [1] >> for >> >> > > > Paimon? >> >> > > > > > IMHO, >> >> > > > > > >> this should not be included in the Sink Reuse. The merging >> >> > > behavior >> >> > > > of >> >> > > > > > >> multiple sources should be considered inside of the sink. >> >> > > > > > >> >> >> > > > > > >> Regards, >> >> > > > > > >> Xiangyu Feng >> >> > > > > > >> >> >> > > > > > >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 >> 13:46写道: >> >> > > > > > >> >> >> > > > > > >>> Hi Yanquan, >> >> > > > > > >>> >> >> > > > > > >>> Thx for reply. IIUC, the schema of CatalogTable should >> >> contain >> >> > > all >> >> > > > > > >>> target columns for sources. If not, a SQL validation >> >> exception >> >> > > > should >> >> > > > > > be >> >> > > > > > >>> raised for planner. >> >> > > > > > >>> >> >> > > > > > >>> Regards, >> >> > > > > > >>> Xiangyu Feng >> >> > > > > > >>> >> >> > > > > > >>> >> >> > > > > > >>> >> >> > > > > > >>> Yanquan Lv <decq12y...@gmail.com> 于2025年2月10日周一 16:25写道: >> >> > > > > > >>> >> >> > > > > > >>>> Hi, Xiangyu. Thanks for driving this. >> >> > > > > > >>>> >> >> > > > > > >>>> I have a question to confirm: >> >> > > > > > >>>> Considering the case that different Sources use >> different >> >> > > > > columns[1], >> >> > > > > > >>>> will the Schema of CatalogTable[2] contain all target >> >> columns >> >> > > for >> >> > > > > > Sources? >> >> > > > > > >>>> And for SinkWriter, the data structure to be processed >> >> should >> >> > be >> >> > > > > > fixed. >> >> > > > > > >>>> Is there a node or an operator to fill in the >> inconsistent >> >> > field >> >> > > > of >> >> > > > > > Rowdata >> >> > > > > > >>>> that passed from different Sources? >> >> > > > > > >>>> >> >> > > > > > >>>> [1] >> >> > > > > > >>>> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >> >> > > > > > >>>> [2] >> >> > > > > > >>>> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#planning >> >> > > > > > >>>> >> >> > > > > > >>>> >> >> > > > > > >>>> >> >> > > > > > >>>> > 2025年2月6日 17:06,xiangyu feng <xiangyu...@gmail.com> >> 写道: >> >> > > > > > >>>> > >> >> > > > > > >>>> > Hi devs, >> >> > > > > > >>>> > >> >> > > > > > >>>> > I'm opening this thread to discuss FLIP-506: Support >> >> Reuse >> >> > > > > Multiple >> >> > > > > > >>>> Table >> >> > > > > > >>>> > Sinks in Planner[1]. >> >> > > > > > >>>> > >> >> > > > > > >>>> > Currently if users want to partial-update a downstream >> >> table >> >> > > > from >> >> > > > > > >>>> multiple >> >> > > > > > >>>> > source tables in one datastream, they would have to >> >> manually >> >> > > > union >> >> > > > > > all >> >> > > > > > >>>> > source tables and add lots of "cast(null as string) as >> >> xxx" >> >> > in >> >> > > > > Flink >> >> > > > > > >>>> SQL. >> >> > > > > > >>>> > This will make the SQL here hard to use and maintain. >> >> > > > > > >>>> > >> >> > > > > > >>>> > After discussing with Weijie Guo, we think that by >> >> > supporting >> >> > > > > reuse >> >> > > > > > >>>> sink >> >> > > > > > >>>> > nodes in planner, the usability can be greatly >> improved >> >> in >> >> > > this >> >> > > > > > case. >> >> > > > > > >>>> > >> >> > > > > > >>>> > Therefore, we propose to add a new option >> >> > > > > > >>>> > *`table.optimizer.reuse-sink-enabled`* here to support >> >> this >> >> > > > > feature. >> >> > > > > > >>>> More >> >> > > > > > >>>> > details can be found in the FLIP. >> >> > > > > > >>>> > >> >> > > > > > >>>> > Looking forward to your feedback, thanks. >> >> > > > > > >>>> > >> >> > > > > > >>>> > [1] >> >> > > > > > >>>> > >> >> > > > > > >>>> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner >> >> > > > > > >>>> > >> >> > > > > > >>>> > Best regards, >> >> > > > > > >>>> > Xiangyu Feng >> >> > > > > > >>>> >> >> > > > > > >>>> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> > >> >