Hi Lincoln, Thx for suggestion.
-- 1st, for the sql example in the Motivation part, why is cast nulls included in the select clause after union all multiple inputs? I do have misplaced the CAST NULLs, revised in the FLIP[1]. -- 2nd, simplifying the multi partial-insert example, what would the equivalent sql look like to the user after applying the optimization provided by this FLIP? ``` INSERT INTO sink(pk, f1) SELECT ... FROM table1; INSERT INTO sink(pk, f2) SELECT ... FROM table2; INSERT INTO sink(pk, f3) SELECT ... FROM table3; ``` Actually, this is already the simplified SQL here. The optimization here is that, with this SQL, user can submit a united datastream job writing to the sink table without concerns for concurrency issues. -- 3rd, for sink digest description in proposed changes, IIUC, it should be `b` for `not include` and `c` for `include`? I think there is an ambiguity for the previous naming of the interface: `SupportsTargetColumnWriting`. The name was originally meaning that the target columns has been **used** in the sink so the sink digest should **include** this information. After second thought, I've changed the naming of this interface to `SupportsTargetColumnReusing` to remove this ambiguity in the FLIP[1]. When this ability is enabled, the sink can support reuse with different target columns which means the sink digest should **not include** this information. @Lincoln, looking forward to hear you back for the response. Also, @Ron Liu @Weijie Guo I would like to hear more from you about this new interface naming. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner Regards, Xiangyu Feng Lincoln Lee <lincoln.8...@gmail.com> 于2025年2月23日周日 22:43写道: > Hi xiangyu, > > Sorry for my late reply! I have some questions for the FLIP: > > 1st, for the sql example in the Motivation part, why is cast nulls included > in > the select clause after union all multiple inputs? Related to the > partial-insert > example later, should the cast nulls be in the select clause inside the > union all? > ``` > -- Flink SQL > INSERT INTO sink > SELECT > id1, > CAST(NULL AS STRING) AS f1, > CAST(NULL AS STRING) AS f2, > CAST(NULL AS STRING) AS f3, > CAST(NULL AS STRING) AS f4, > CAST(NULL AS STRING) AS f5, > CAST(NULL AS STRING) AS f6, > CAST(NULL AS STRING) AS f7, > CAST(NULL AS STRING) AS f8, > CAST(NULL AS STRING) AS f9, > ... ... > FROM ( > SELECT ... ... > FROM table1 > UNION ALL > SELECT ... ... > FROM table2 > ``` > > 2nd, simplifying the multi partial-insert example, what would the > equivalent > sql look like to the user after applying the optimization provided by this > FLIP? > ``` > INSERT INTO sink(pk, f1) SELECT ... FROM table1; > INSERT INTO sink(pk, f2) SELECT ... FROM table2; > INSERT INTO sink(pk, f3) SELECT ... FROM table3; > ``` > > 3rd, for sink digest description in proposed changes, IIUC, it should be > `b` for `not include` and `c` for `include`? > ``` > > Factors *considered* for sink node digest depends on circumstance: > > 1. sink target columns > 1. The sink node digest will* include* the target columns if the sink > has not implement the target column writing ability interface. > 2. The sink node digest will* include* the target columns when the > sink has enabled the target column writing ability > 3. The sink node digest will* not include* the target columns when > the sink has not enabled the target column writing ability > > ``` > > > Best, > Lincoln Lee > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月20日周四 09:41写道: > > > Hi all, > > > > Thank you all for the comments. > > > > If there is no further comment, I will open the voting thread in 3 days. > > > > Regards, > > Xiangyu > > > > > > Ron Liu <ron9....@gmail.com>于2025年2月19日 周三17:58写道: > > > > > Hi, Xiangyu > > > > > > Thaks for updates, LGTM > > > > > > Best, > > > Ron > > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月19日周三 17:13写道: > > > > > >> Hi Ron, > > >> > > >> Thx for ur advice. I've made the changes to current FLIP[1] including > > >> renaming the interface and remove the default implementation. As we > have > > >> discussed, the target columns will be compared in sink reuse if the > sink > > >> has not implemented the `SupportsTargetColumnWriting` ability. This > will > > >> make sure the sink reuse feature can still be safely enabled by > default. > > >> > > >> [1] > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner > > >> > > >> Best, > > >> Xiangyu Feng > > >> > > >> Ron Liu <ron9....@gmail.com> 于2025年2月19日周三 10:25写道: > > >> > > >>> Hi Xiangyu, > > >>> > > >>> Thanks for your reply, the updates LGTM overall. > > >>> > > >>> 1. Regarding the naming of the interface, what do you think about > > >>> calling it SupportsTargetColumnWriting? Here I would like to > emphasize > > the > > >>> support for partial column writing, and I personally think the naming > > can > > >>> be aligned with SupportsWritingMetadata. > > >>> > > >>> 2. Regarding the interface methods, is it necessary to provide a > > default > > >>> implementation, do most of the stores support partial column writing? > > >>> > > >>> > > >>> Best, > > >>> Ron > > >>> > > >>> Cong Cheng <congchengch...@gmail.com> 于2025年2月18日周二 16:12写道: > > >>> > > >>>> Hi Xiangyu, > > >>>> > > >>>> Introduce a new sink ability interface named > > >>>> `SupportsTargetColumnUpdate`, > > >>>> > this interface will tell the planner if the sink has considered > the > > >>>> target > > >>>> > columns information in its implementation; > > >>>> > > >>>> > > >>>> I think it makes a lot of sense, +1 for this ability. > > >>>> > > >>>> Sorry to all that I sended the draft of the content twice, something > > >>>> wrong with the enter of my keyboard. > > >>>> > > >>>> Best, > > >>>> Cong Cheng > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> xiangyu feng <xiangyu...@gmail.com> 于2025年2月18日周二 15:06写道: > > >>>> > > >>>> > Hi Kevin, > > >>>> > > > >>>> > Thx for ur valuable suggestion. I've made a few changes to current > > >>>> FLIP[1]. > > >>>> > > > >>>> > 1, Introduce a new sink ability interface named > > >>>> > `SupportsTargetColumnUpdate`, this interface will tell the planner > > if > > >>>> the > > >>>> > sink has considered the target columns information in its > > >>>> implementation; > > >>>> > > > >>>> > 2, This ability will return true by default for safety > > consideration; > > >>>> > > > >>>> > 3, When generating node digest for sink reuse, the digest will > only > > >>>> ignore > > >>>> > the target column infos when this ability return false. This will > > >>>> require > > >>>> > extra work for specific sink. > > >>>> > > > >>>> > By applying these changes, we can safely enable the sink reuse > > >>>> feature by > > >>>> > default even for sinks like JDBC . And for sinks like Paimon, we > can > > >>>> also > > >>>> > reuse the sink node across multiple partial-update streams with > > >>>> different > > >>>> > target columns by revising paimon sink to implement this > interface. > > >>>> > > > >>>> > Glad to hear you back for these updates. > > >>>> > > > >>>> > [1] > > >>>> > > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner > > >>>> > > > >>>> > > > >>>> > > > >>>> > Kevin Cheng <congchengch...@gmail.com> 于2025年2月14日周五 16:13写道: > > >>>> > > > >>>> >> Hi Xiangyu and Ron, > > >>>> >> > > >>>> >> I agree that sink reuse can be enabled by default from Flink > > Planner > > >>>> >> perspective. But the planner should be informed by Sink Connector > > >>>> that > > >>>> >> whether the planner can reuse different sink with different > target > > >>>> >> columns. > > >>>> >> > > >>>> >> Take JBDC sink as an example, under partial update circumstances, > > >>>> the JDBC > > >>>> >> needs to know the target sink or update columns of every record. > > >>>> However, > > >>>> >> the target columns info is discarded in current FLIP design. > > >>>> >> > > >>>> >> Best, > > >>>> >> Xiangyu > > >>>> >> > > >>>> >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月14日周五 13:51写道: > > >>>> >> > > >>>> >> > Hi Ron, > > >>>> >> > > > >>>> >> > After second thought, taking sink reuse as a long awaited > feature > > >>>> >> > with significant benefits for users, I agree to enable this in > > the > > >>>> first > > >>>> >> > place. Similar features like > > >>>> `table.optimizer.reuse-sub-plan-enabled` > > >>>> >> and > > >>>> >> > `table.optimizer.reuse-source-enabled` are also enabled by > > >>>> default. From > > >>>> >> > this point of view, sink reuse should be the same. > > >>>> >> > > > >>>> >> > The Flip[1] has been revised accordingly. Thx for suggestion. > > >>>> >> > > > >>>> >> > [1] > > >>>> >> > > > >>>> >> > > > >>>> >> > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner > > >>>> >> > > > >>>> >> > Regards, > > >>>> >> > Xiangyu > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > Ron Liu <ron9....@gmail.com> 于2025年2月14日周五 12:10写道: > > >>>> >> > > > >>>> >> > > Hi, Xiangyu > > >>>> >> > > > > >>>> >> > > >>> I prefer to set the default value of this option as'false > > in > > >>>> the > > >>>> >> > first > > >>>> >> > > place. Setting as true might introduce unexpected behavior > for > > >>>> users > > >>>> >> > when > > >>>> >> > > operating existing jobs. Maybe we should introduce this > feature > > >>>> at > > >>>> >> first > > >>>> >> > > and discuss enabling this feature as default in a separated > > >>>> thread. > > >>>> >> WDYT? > > >>>> >> > > > > >>>> >> > > 1. What unexpected behaviors do you think this might > introduce? > > >>>> For > > >>>> >> Sink > > >>>> >> > > nodes, which are generally stateless, I intuitively > understand > > >>>> that no > > >>>> >> > > state compatibility issues will be introduced after Sink > reuse. > > >>>> >> > > > > >>>> >> > > 2. Since Sink reuse benefits users, why not enable this > feature > > >>>> by > > >>>> >> > default > > >>>> >> > > on the first day it is introduced? If your concern is > potential > > >>>> >> unhandled > > >>>> >> > > corner cases in the implementation, I consider those to be > > bugs. > > >>>> We > > >>>> >> > should > > >>>> >> > > prioritize fixing them rather than blocking the default > > >>>> enablement of > > >>>> >> > this > > >>>> >> > > optimization. > > >>>> >> > > > > >>>> >> > > 3. If we don't enable it by default now, when should we? What > > >>>> specific > > >>>> >> > > milestones or actions are needed during the waiting period? > > Your > > >>>> >> > concerns > > >>>> >> > > about unintended behaviors would still exist even if we > enable > > it > > >>>> >> later. > > >>>> >> > > Why delay resolving this in a separate discussion instead of > > >>>> >> finalizing > > >>>> >> > it > > >>>> >> > > here? > > >>>> >> > > > > >>>> >> > > 4. From our internal practice, users still want to enjoy the > > >>>> benefits > > >>>> >> of > > >>>> >> > > this feature by default. > > >>>> >> > > > > >>>> >> > > > > >>>> >> > > Best, > > >>>> >> > > Ron > > >>>> >> > > > > >>>> >> > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 15:57写道: > > >>>> >> > > > > >>>> >> > > > Hi Ron, > > >>>> >> > > > > > >>>> >> > > > Thx for quick response. > > >>>> >> > > > > > >>>> >> > > > - should the default value be true for the newly introduced > > >>>> option > > >>>> >> > > > `table.optimizer.reuse-sink-enabled`? > > >>>> >> > > > > > >>>> >> > > > I prefer to set the default value of this option as'false > in > > >>>> the > > >>>> >> first > > >>>> >> > > > place. Setting as true might introduce unexpected behavior > > for > > >>>> >> users > > >>>> >> > > when > > >>>> >> > > > operating existing jobs. Maybe we should introduce this > > >>>> feature at > > >>>> >> > first > > >>>> >> > > > and discuss enabling this feature as default in a separated > > >>>> thread. > > >>>> >> > WDYT? > > >>>> >> > > > > > >>>> >> > > > - have you considered the technical implementation options > > and > > >>>> are > > >>>> >> they > > >>>> >> > > > feasible? > > >>>> >> > > > > > >>>> >> > > > Yes, we have already implemented the POC internally. It > works > > >>>> well. > > >>>> >> > > > > > >>>> >> > > > Looking forward for your feedback. > > >>>> >> > > > > > >>>> >> > > > Best, > > >>>> >> > > > Xiangyu > > >>>> >> > > > > > >>>> >> > > > Ron Liu <ron9....@gmail.com> 于2025年2月13日周四 14:55写道: > > >>>> >> > > > > > >>>> >> > > > > Hi, Xiangyu > > >>>> >> > > > > > > >>>> >> > > > > Thank you for proposing this FLIP, it's great work and > > looks > > >>>> very > > >>>> >> > > useful > > >>>> >> > > > > for users. > > >>>> >> > > > > > > >>>> >> > > > > I have the following two questions regarding the content > of > > >>>> the > > >>>> >> FLIP: > > >>>> >> > > > > 1. Since sink reuse is very useful, should the default > > value > > >>>> be > > >>>> >> true > > >>>> >> > > for > > >>>> >> > > > > the newly introduced option > > >>>> `table.optimizer.reuse-sink-enabled`, > > >>>> >> and > > >>>> >> > > > > should the engine enable this optimization by default. > > >>>> Currently > > >>>> >> for > > >>>> >> > > > source > > >>>> >> > > > > reuse, the default value of > > >>>> >> > `sql.optimizer.reuse.table-source.enabled` > > >>>> >> > > > > option is also true, which does not require user access > by > > >>>> >> default, > > >>>> >> > so > > >>>> >> > > I > > >>>> >> > > > > think the engine should turn on Sink reuse optimization > by > > >>>> >> default. > > >>>> >> > > > > 2. Regarding Sink Digest, you mentioned disregarding the > > sink > > >>>> >> target > > >>>> >> > > > > column, which I think is a very good suggestion, and very > > >>>> useful > > >>>> >> if > > >>>> >> > it > > >>>> >> > > > can > > >>>> >> > > > > be done. I have a question: have you considered the > > technical > > >>>> >> > > > > implementation options and are they feasible? > > >>>> >> > > > > > > >>>> >> > > > > Best, > > >>>> >> > > > > Ron > > >>>> >> > > > > > > >>>> >> > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 > 12:56写道: > > >>>> >> > > > > > > >>>> >> > > > > > Hi all, > > >>>> >> > > > > > > > >>>> >> > > > > > Thank you all for the comments. > > >>>> >> > > > > > > > >>>> >> > > > > > If there is no further comment, I will open the voting > > >>>> thread > > >>>> >> in 3 > > >>>> >> > > > days. > > >>>> >> > > > > > > > >>>> >> > > > > > Regards, > > >>>> >> > > > > > Xiangyu > > >>>> >> > > > > > > > >>>> >> > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 > > 14:17写道: > > >>>> >> > > > > > > > >>>> >> > > > > > > Link for Paimon LocalMerge Operator[1] > > >>>> >> > > > > > > > > >>>> >> > > > > > > [1] > > >>>> >> > > > > > > > > >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> > > > https://paimon.apache.org/docs/master/maintenance/write-performance/#local-merging > > >>>> >> > > > > > > > > >>>> >> > > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 > > >>>> 14:03写道: > > >>>> >> > > > > > > > > >>>> >> > > > > > >> Follow the above, > > >>>> >> > > > > > >> > > >>>> >> > > > > > >> "And for SinkWriter, the data structure to be > > processed > > >>>> >> should > > >>>> >> > be > > >>>> >> > > > > > fixed." > > >>>> >> > > > > > >> > > >>>> >> > > > > > >> I'm not very sure why the data structure of > SinkWriter > > >>>> >> should be > > >>>> >> > > > > fixed. > > >>>> >> > > > > > >> Can you elaborate the scenario here? > > >>>> >> > > > > > >> > > >>>> >> > > > > > >> "Is there a node or an operator to fill in the > > >>>> inconsistent > > >>>> >> > field > > >>>> >> > > > of > > >>>> >> > > > > > >> Rowdata that passed from different Sources?" > > >>>> >> > > > > > >> > > >>>> >> > > > > > >> By `filling in the inconsistent field from different > > >>>> >> sources`, > > >>>> >> > do > > >>>> >> > > > you > > >>>> >> > > > > > >> refer to implementations like the LocalMerge > Operator > > >>>> [1] for > > >>>> >> > > > Paimon? > > >>>> >> > > > > > IMHO, > > >>>> >> > > > > > >> this should not be included in the Sink Reuse. The > > >>>> merging > > >>>> >> > > behavior > > >>>> >> > > > of > > >>>> >> > > > > > >> multiple sources should be considered inside of the > > >>>> sink. > > >>>> >> > > > > > >> > > >>>> >> > > > > > >> Regards, > > >>>> >> > > > > > >> Xiangyu Feng > > >>>> >> > > > > > >> > > >>>> >> > > > > > >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 > > >>>> 13:46写道: > > >>>> >> > > > > > >> > > >>>> >> > > > > > >>> Hi Yanquan, > > >>>> >> > > > > > >>> > > >>>> >> > > > > > >>> Thx for reply. IIUC, the schema of CatalogTable > > should > > >>>> >> contain > > >>>> >> > > all > > >>>> >> > > > > > >>> target columns for sources. If not, a SQL > validation > > >>>> >> exception > > >>>> >> > > > should > > >>>> >> > > > > > be > > >>>> >> > > > > > >>> raised for planner. > > >>>> >> > > > > > >>> > > >>>> >> > > > > > >>> Regards, > > >>>> >> > > > > > >>> Xiangyu Feng > > >>>> >> > > > > > >>> > > >>>> >> > > > > > >>> > > >>>> >> > > > > > >>> > > >>>> >> > > > > > >>> Yanquan Lv <decq12y...@gmail.com> 于2025年2月10日周一 > > >>>> 16:25写道: > > >>>> >> > > > > > >>> > > >>>> >> > > > > > >>>> Hi, Xiangyu. Thanks for driving this. > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > >>>> I have a question to confirm: > > >>>> >> > > > > > >>>> Considering the case that different Sources use > > >>>> different > > >>>> >> > > > > columns[1], > > >>>> >> > > > > > >>>> will the Schema of CatalogTable[2] contain all > > target > > >>>> >> columns > > >>>> >> > > for > > >>>> >> > > > > > Sources? > > >>>> >> > > > > > >>>> And for SinkWriter, the data structure to be > > processed > > >>>> >> should > > >>>> >> > be > > >>>> >> > > > > > fixed. > > >>>> >> > > > > > >>>> Is there a node or an operator to fill in the > > >>>> inconsistent > > >>>> >> > field > > >>>> >> > > > of > > >>>> >> > > > > > Rowdata > > >>>> >> > > > > > >>>> that passed from different Sources? > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > >>>> [1] > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner > > >>>> >> > > > > > >>>> [2] > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#planning > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > >>>> > 2025年2月6日 17:06,xiangyu feng < > > xiangyu...@gmail.com> > > >>>> 写道: > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > Hi devs, > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > I'm opening this thread to discuss FLIP-506: > > Support > > >>>> >> Reuse > > >>>> >> > > > > Multiple > > >>>> >> > > > > > >>>> Table > > >>>> >> > > > > > >>>> > Sinks in Planner[1]. > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > Currently if users want to partial-update a > > >>>> downstream > > >>>> >> table > > >>>> >> > > > from > > >>>> >> > > > > > >>>> multiple > > >>>> >> > > > > > >>>> > source tables in one datastream, they would have > > to > > >>>> >> manually > > >>>> >> > > > union > > >>>> >> > > > > > all > > >>>> >> > > > > > >>>> > source tables and add lots of "cast(null as > > string) > > >>>> as > > >>>> >> xxx" > > >>>> >> > in > > >>>> >> > > > > Flink > > >>>> >> > > > > > >>>> SQL. > > >>>> >> > > > > > >>>> > This will make the SQL here hard to use and > > >>>> maintain. > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > After discussing with Weijie Guo, we think that > by > > >>>> >> > supporting > > >>>> >> > > > > reuse > > >>>> >> > > > > > >>>> sink > > >>>> >> > > > > > >>>> > nodes in planner, the usability can be greatly > > >>>> improved > > >>>> >> in > > >>>> >> > > this > > >>>> >> > > > > > case. > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > Therefore, we propose to add a new option > > >>>> >> > > > > > >>>> > *`table.optimizer.reuse-sink-enabled`* here to > > >>>> support > > >>>> >> this > > >>>> >> > > > > feature. > > >>>> >> > > > > > >>>> More > > >>>> >> > > > > > >>>> > details can be found in the FLIP. > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > Looking forward to your feedback, thanks. > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > [1] > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner > > >>>> >> > > > > > >>>> > > > >>>> >> > > > > > >>>> > Best regards, > > >>>> >> > > > > > >>>> > Xiangyu Feng > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > >>>> > > >>>> >> > > > > > > > >>>> >> > > > > > > >>>> >> > > > > > >>>> >> > > > > >>>> >> > > > >>>> >> > > >>>> > > > >>>> > > >>> > > >