Hi Ron,

Thx for ur advice.  I've made the changes to current FLIP[1] including
renaming the interface and remove the default implementation. As we have
discussed, the target columns will be compared in sink reuse if the sink
has not implemented the `SupportsTargetColumnWriting` ability. This will
make sure the sink reuse feature can still be safely enabled by default.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner

Best,
Xiangyu Feng

Ron Liu <ron9....@gmail.com> 于2025年2月19日周三 10:25写道:

> Hi Xiangyu,
>
> Thanks for your reply, the updates LGTM  overall.
>
> 1. Regarding the naming of the interface, what do you think about calling
> it SupportsTargetColumnWriting? Here I would like to emphasize the support
> for partial column writing, and I personally think the naming can be
> aligned with SupportsWritingMetadata.
>
> 2. Regarding the interface methods, is it necessary to provide a default
> implementation, do most of the stores support partial column writing?
>
>
> Best,
> Ron
>
> Cong Cheng <congchengch...@gmail.com> 于2025年2月18日周二 16:12写道:
>
>> Hi Xiangyu,
>>
>> Introduce a new sink ability interface named `SupportsTargetColumnUpdate`,
>> > this interface will tell the planner if the sink has considered the
>> target
>> > columns information in its implementation;
>>
>>
>> I think it makes a lot of sense, +1 for this ability.
>>
>> Sorry to all that I sended the draft of the content twice, something
>> wrong with the enter of my keyboard.
>>
>> Best,
>> Cong Cheng
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> xiangyu feng <xiangyu...@gmail.com> 于2025年2月18日周二 15:06写道:
>>
>> > Hi Kevin,
>> >
>> > Thx for ur valuable suggestion. I've made a few changes to current
>> FLIP[1].
>> >
>> > 1, Introduce a new sink ability interface named
>> > `SupportsTargetColumnUpdate`, this interface will tell the planner if
>> the
>> > sink has considered the target columns information in its
>> implementation;
>> >
>> > 2, This ability will return true by default for safety consideration;
>> >
>> > 3, When generating node digest for sink reuse, the digest will only
>> ignore
>> > the target column infos when this ability return false. This will
>> require
>> > extra work for specific sink.
>> >
>> > By applying these changes, we can safely enable the sink reuse feature
>> by
>> > default even for sinks like JDBC . And for sinks like Paimon, we can
>> also
>> > reuse the sink node across multiple partial-update streams with
>> different
>> > target columns by revising paimon sink to implement this interface.
>> >
>> > Glad to hear you back for these updates.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner
>> >
>> >
>> >
>> > Kevin Cheng <congchengch...@gmail.com> 于2025年2月14日周五 16:13写道:
>> >
>> >> Hi Xiangyu and Ron,
>> >>
>> >> I agree that sink reuse can be enabled by default from Flink Planner
>> >> perspective. But the planner should be informed by Sink Connector that
>> >> whether the planner can reuse different sink with different target
>> >> columns.
>> >>
>> >> Take JBDC sink as an example, under partial update circumstances, the
>> JDBC
>> >> needs to know the target sink or update columns of every record.
>> However,
>> >> the target columns info is discarded in current FLIP design.
>> >>
>> >> Best,
>> >> Xiangyu
>> >>
>> >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月14日周五 13:51写道:
>> >>
>> >> > Hi Ron,
>> >> >
>> >> > After second thought, taking sink reuse as a long awaited feature
>> >> > with significant benefits for users, I agree to enable this in the
>> first
>> >> > place.  Similar features like
>> `table.optimizer.reuse-sub-plan-enabled`
>> >> and
>> >> > `table.optimizer.reuse-source-enabled` are also enabled by default.
>> From
>> >> > this point of view, sink reuse should be the same.
>> >> >
>> >> > The Flip[1] has been revised accordingly. Thx for suggestion.
>> >> >
>> >> > [1]
>> >> >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner
>> >> >
>> >> > Regards,
>> >> > Xiangyu
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > Ron Liu <ron9....@gmail.com> 于2025年2月14日周五 12:10写道:
>> >> >
>> >> > > Hi, Xiangyu
>> >> > >
>> >> > > >>> I prefer to set the default value of this option as'false in
>> the
>> >> > first
>> >> > > place.  Setting as true might introduce unexpected behavior for
>> users
>> >> > when
>> >> > > operating existing jobs. Maybe we should introduce this feature at
>> >> first
>> >> > > and discuss enabling this feature as default in a separated thread.
>> >> WDYT?
>> >> > >
>> >> > > 1. What unexpected behaviors do you think this might introduce?
>> For
>> >> Sink
>> >> > > nodes, which are generally stateless, I intuitively understand
>> that no
>> >> > > state compatibility issues will be introduced after Sink reuse.
>> >> > >
>> >> > > 2. Since Sink reuse benefits users, why not enable this feature by
>> >> > default
>> >> > > on the first day it is introduced? If your concern is potential
>> >> unhandled
>> >> > > corner cases in the implementation, I consider those to be bugs. We
>> >> > should
>> >> > > prioritize fixing them rather than blocking the default enablement
>> of
>> >> > this
>> >> > > optimization.
>> >> > >
>> >> > > 3. If we don't enable it by default now, when should we? What
>> specific
>> >> > > milestones or actions are needed during the waiting period?  Your
>> >> > concerns
>> >> > > about unintended behaviors would still exist even if we enable it
>> >> later.
>> >> > > Why delay resolving this in a separate discussion instead of
>> >> finalizing
>> >> > it
>> >> > > here?
>> >> > >
>> >> > > 4. From our internal practice, users still want to enjoy the
>> benefits
>> >> of
>> >> > > this feature by default.
>> >> > >
>> >> > >
>> >> > > Best,
>> >> > > Ron
>> >> > >
>> >> > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 15:57写道:
>> >> > >
>> >> > > >  Hi Ron,
>> >> > > >
>> >> > > > Thx for quick response.
>> >> > > >
>> >> > > > - should the default value be true for the newly introduced
>> option
>> >> > > > `table.optimizer.reuse-sink-enabled`?
>> >> > > >
>> >> > > > I prefer to set the default value of this option as'false in the
>> >> first
>> >> > > > place.  Setting as true might introduce unexpected behavior for
>> >> users
>> >> > > when
>> >> > > > operating existing jobs. Maybe we should introduce this feature
>> at
>> >> > first
>> >> > > > and discuss enabling this feature as default in a separated
>> thread.
>> >> > WDYT?
>> >> > > >
>> >> > > > - have you considered the technical implementation options and
>> are
>> >> they
>> >> > > > feasible?
>> >> > > >
>> >> > > > Yes, we have already implemented the POC internally. It works
>> well.
>> >> > > >
>> >> > > > Looking forward for your feedback.
>> >> > > >
>> >> > > > Best,
>> >> > > > Xiangyu
>> >> > > >
>> >> > > > Ron Liu <ron9....@gmail.com> 于2025年2月13日周四 14:55写道:
>> >> > > >
>> >> > > > > Hi, Xiangyu
>> >> > > > >
>> >> > > > > Thank you for proposing this FLIP, it's great work and looks
>> very
>> >> > > useful
>> >> > > > > for users.
>> >> > > > >
>> >> > > > > I have the following two questions regarding the content of the
>> >> FLIP:
>> >> > > > > 1. Since sink reuse is very useful, should the default value be
>> >> true
>> >> > > for
>> >> > > > > the newly introduced option
>> `table.optimizer.reuse-sink-enabled`,
>> >> and
>> >> > > > > should the engine enable this optimization by default.
>> Currently
>> >> for
>> >> > > > source
>> >> > > > > reuse, the default value of
>> >> > `sql.optimizer.reuse.table-source.enabled`
>> >> > > > > option is also true, which does not require user access by
>> >> default,
>> >> > so
>> >> > > I
>> >> > > > > think the engine should turn on Sink reuse optimization by
>> >> default.
>> >> > > > > 2. Regarding Sink Digest, you mentioned disregarding the sink
>> >> target
>> >> > > > > column, which I think is a very good suggestion, and very
>> useful
>> >> if
>> >> > it
>> >> > > > can
>> >> > > > > be done. I have a question: have you considered the technical
>> >> > > > > implementation options and are they feasible?
>> >> > > > >
>> >> > > > > Best,
>> >> > > > > Ron
>> >> > > > >
>> >> > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月13日周四 12:56写道:
>> >> > > > >
>> >> > > > > > Hi all,
>> >> > > > > >
>> >> > > > > > Thank you all for the comments.
>> >> > > > > >
>> >> > > > > > If there is no further comment, I will open the voting thread
>> >> in 3
>> >> > > > days.
>> >> > > > > >
>> >> > > > > > Regards,
>> >> > > > > > Xiangyu
>> >> > > > > >
>> >> > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 14:17写道:
>> >> > > > > >
>> >> > > > > > > Link for Paimon LocalMerge Operator[1]
>> >> > > > > > >
>> >> > > > > > > [1]
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://paimon.apache.org/docs/master/maintenance/write-performance/#local-merging
>> >> > > > > > >
>> >> > > > > > > xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二 14:03写道:
>> >> > > > > > >
>> >> > > > > > >> Follow the above,
>> >> > > > > > >>
>> >> > > > > > >> "And for SinkWriter, the data structure to be processed
>> >> should
>> >> > be
>> >> > > > > > fixed."
>> >> > > > > > >>
>> >> > > > > > >> I'm not very sure why the data structure of SinkWriter
>> >> should be
>> >> > > > > fixed.
>> >> > > > > > >> Can you elaborate the scenario here?
>> >> > > > > > >>
>> >> > > > > > >>  "Is there a node or an operator to fill in the
>> inconsistent
>> >> > field
>> >> > > > of
>> >> > > > > > >> Rowdata that passed from different Sources?"
>> >> > > > > > >>
>> >> > > > > > >> By `filling in the inconsistent field from different
>> >> sources`,
>> >> > do
>> >> > > > you
>> >> > > > > > >> refer to implementations like the LocalMerge Operator [1]
>> for
>> >> > > > Paimon?
>> >> > > > > > IMHO,
>> >> > > > > > >> this should not be included in the Sink Reuse. The merging
>> >> > > behavior
>> >> > > > of
>> >> > > > > > >> multiple sources should be considered inside of the sink.
>> >> > > > > > >>
>> >> > > > > > >> Regards,
>> >> > > > > > >> Xiangyu Feng
>> >> > > > > > >>
>> >> > > > > > >> xiangyu feng <xiangyu...@gmail.com> 于2025年2月11日周二
>> 13:46写道:
>> >> > > > > > >>
>> >> > > > > > >>> Hi Yanquan,
>> >> > > > > > >>>
>> >> > > > > > >>> Thx for reply. IIUC, the schema of CatalogTable should
>> >> contain
>> >> > > all
>> >> > > > > > >>> target columns for sources. If not, a SQL validation
>> >> exception
>> >> > > > should
>> >> > > > > > be
>> >> > > > > > >>> raised for planner.
>> >> > > > > > >>>
>> >> > > > > > >>> Regards,
>> >> > > > > > >>> Xiangyu Feng
>> >> > > > > > >>>
>> >> > > > > > >>>
>> >> > > > > > >>>
>> >> > > > > > >>> Yanquan Lv <decq12y...@gmail.com> 于2025年2月10日周一 16:25写道:
>> >> > > > > > >>>
>> >> > > > > > >>>> Hi, Xiangyu. Thanks for driving this.
>> >> > > > > > >>>>
>> >> > > > > > >>>> I have a question to confirm:
>> >> > > > > > >>>> Considering the case that different Sources use
>> different
>> >> > > > > columns[1],
>> >> > > > > > >>>> will the Schema of CatalogTable[2] contain all target
>> >> columns
>> >> > > for
>> >> > > > > > Sources?
>> >> > > > > > >>>> And for SinkWriter, the data structure to be processed
>> >> should
>> >> > be
>> >> > > > > > fixed.
>> >> > > > > > >>>> Is there a node or an operator to fill in the
>> inconsistent
>> >> > field
>> >> > > > of
>> >> > > > > > Rowdata
>> >> > > > > > >>>> that passed from different Sources?
>> >> > > > > > >>>>
>> >> > > > > > >>>> [1]
>> >> > > > > > >>>>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner
>> >> > > > > > >>>> [2]
>> >> > > > > > >>>>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#planning
>> >> > > > > > >>>>
>> >> > > > > > >>>>
>> >> > > > > > >>>>
>> >> > > > > > >>>> > 2025年2月6日 17:06,xiangyu feng <xiangyu...@gmail.com>
>> 写道:
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > Hi devs,
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > I'm opening this thread to discuss FLIP-506: Support
>> >> Reuse
>> >> > > > > Multiple
>> >> > > > > > >>>> Table
>> >> > > > > > >>>> > Sinks in Planner[1].
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > Currently if users want to partial-update a downstream
>> >> table
>> >> > > > from
>> >> > > > > > >>>> multiple
>> >> > > > > > >>>> > source tables in one datastream, they would have to
>> >> manually
>> >> > > > union
>> >> > > > > > all
>> >> > > > > > >>>> > source tables and add lots of "cast(null as string) as
>> >> xxx"
>> >> > in
>> >> > > > > Flink
>> >> > > > > > >>>> SQL.
>> >> > > > > > >>>> > This will make the SQL here hard to use and maintain.
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > After discussing with Weijie Guo, we think that by
>> >> > supporting
>> >> > > > > reuse
>> >> > > > > > >>>> sink
>> >> > > > > > >>>> > nodes in planner, the usability can be greatly
>> improved
>> >> in
>> >> > > this
>> >> > > > > > case.
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > Therefore, we propose to add a new option
>> >> > > > > > >>>> > *`table.optimizer.reuse-sink-enabled`* here to support
>> >> this
>> >> > > > > feature.
>> >> > > > > > >>>> More
>> >> > > > > > >>>> > details can be found in the FLIP.
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > Looking forward to your feedback, thanks.
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > [1]
>> >> > > > > > >>>> >
>> >> > > > > > >>>>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner
>> >> > > > > > >>>> >
>> >> > > > > > >>>> > Best regards,
>> >> > > > > > >>>> > Xiangyu Feng
>> >> > > > > > >>>>
>> >> > > > > > >>>>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>>
>

Reply via email to