Hi shuai, Thanks for the update! Regarding the newly introduced configuration, I hold the same concern with Benchao and Xuyang.
First of all, in most cases, the fact that users choose to enable mini-batch configuration indicates they are aware of the trade-off between throughput and completeness of the changelog. And if we finally adopt this configuration solely to avoid state incompatibility, does it mean that we will need to introduce a new configuration for every future operator's mini-batch optimization, similar to what we did today? Best, Jane On Fri, Jan 12, 2024 at 1:45 PM Xuyang <xyzhong...@163.com> wrote: > Hi, Xu Shuai. Thanks for driving this flip. > > > The CDC message amplification of cascade join has always been a problem > for users. Judging from the > nexmark results, this optimization is very meaningful. I just have the > same doubts as Benchao, why can't we > use minibatch join as the default behavior when the user turns on > minibatch? > > > > Although the semantic of changelog emitted by the Join operator is > eventual consistency, the change might > not be supposed for the downstream of the job which requires details of > changelog. > > > I think if the user adds the minibatch options to his job to enable > minibatch, he should know that flink will reduce > the amount of data sent to downstream by folding CDC messages as much as > possible. In scenarios where all > details of CDC records need to be retained, such as just synchronizing > data with jobs from one db to another db, > users have no reason to enable minibatch. > > > The only scenario I can think of that requires adding this independent > minibatch join option is to ensure that the state > is compatible between multiple versions, but we have not promised users > state compatibility during cross-version upgrades. > > > Maybe we need to figure it out why does the > 'table.exec.deduplicate.mini-batch.compact-changes-enabled' option need to > be added to deduplicate operator? I think this is the same reason as > adding a separate parameter to join to control CDC message folding. > > > > > -- > > Best! > Xuyang > > > > > > 在 2024-01-11 16:19:30,"Benchao Li" <libenc...@apache.org> 写道: > >> the change might not be supposed for the downstream of the job which > requires details of changelog > > > >Could you elaborate on this a bit? I've never met such kinds of > >requirements before, I'm curious what is the scenario that requires > >this. > > > >shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道: > >> > >> Thanks for your response, Benchao. > >> > >> Here is my thought on the newly added option. > >> Users' current jobs are running on a version without minibatch join. If > the existing option to enable minibatch join is utilized, then when users' > jobs are migrated to the new version, the internal behavior of the join > operation within the jobs will change. Although the semantic of changelog > emitted by the Join operator is eventual consistency, the change might not > be supposed for the downstream of the job which requires details of > changelog. This newly added option also refers to > 'table.exec.deduplicate.mini-batch.compact-changes-enabled'. > >> > >> As for the implementation,The new operator shares the state of the > original operator and it merely has an additional minibatch for storing > records to do some optimization. The storage remains consistent, and there > is minor modification to the computational logic. > >> > >> Best, > >> Xu Shuai > >> > >> > 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道: > >> > > >> > Thanks shuai for driving this, mini-batch Join is a very useful > >> > optimization, +1 for the general idea. > >> > > >> > Regarding the configuration > >> > "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really > >> > necessary. The semantic of changelog emitted by the Join operator is > >> > eventual consistency, so there is no much difference between original > >> > Join and mini-batch Join from this aspect. Besides, introducing more > >> > options would make it more complex for users, harder to understand and > >> > maintain, which we should be careful about. > >> > > >> > One thing about the implementation, could you make the new operator > >> > share the same state definition with the original one? > >> > > >> > shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道: > >> >> > >> >> Hi devs, > >> >> > >> >> I’d like to start a discussion on FLIP-415: Introduce a new join > operator to support minibatch[1]. > >> >> > >> >> Currently, when performing cascading connections in Flink, there is > a pain point of record amplification. Every record join operator receives > would trigger join process. However, if records of +I and -D matches , they > could be folded to reduce two times of join process. Besides, records of > -U +U might output 4 records in which two records are redundant when > encountering outer join . > >> >> > >> >> To address this issue, this FLIP introduces a new > MiniBatchStreamingJoinOperator to achieve batch processing which could > reduce number of outputting redundant messages and avoid unnecessary join > processes. > >> >> A new option is added to control the operator to avoid influencing > existing jobs. > >> >> > >> >> Please find more details in the FLIP wiki document [1]. Looking > >> >> forward to your feedback. > >> >> > >> >> [1] > >> >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch > >> >> > >> >> Best, > >> >> Xu Shuai > >> > > >> > > >> > > >> > -- > >> > > >> > Best, > >> > Benchao Li > >> > > > > > >-- > > > >Best, > >Benchao Li >