Suppose we currently have a job that joins two CDC sources after de-duplicating them and the output is available for audit analysis, and the user turns off the parameter "table.exec.deduplicate.mini-batch.compact-changes-enabled" to ensure that it does not lose update details. If we don't introduce this parameter, after the user upgrades the version, some update details may be lost due to the mini-batch connection being enabled by default, resulting in distorted audit results.
> 2024年1月11日 16:19,Benchao Li <libenc...@apache.org> 写道: > >> the change might not be supposed for the downstream of the job which >> requires details of changelog > > Could you elaborate on this a bit? I've never met such kinds of > requirements before, I'm curious what is the scenario that requires > this. > > shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道: >> >> Thanks for your response, Benchao. >> >> Here is my thought on the newly added option. >> Users' current jobs are running on a version without minibatch join. If the >> existing option to enable minibatch join is utilized, then when users' jobs >> are migrated to the new version, the internal behavior of the join operation >> within the jobs will change. Although the semantic of changelog emitted by >> the Join operator is eventual consistency, the change might not be supposed >> for the downstream of the job which requires details of changelog. This >> newly added option also refers to >> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'. >> >> As for the implementation,The new operator shares the state of the original >> operator and it merely has an additional minibatch for storing records to do >> some optimization. The storage remains consistent, and there is minor >> modification to the computational logic. >> >> Best, >> Xu Shuai >> >>> 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道: >>> >>> Thanks shuai for driving this, mini-batch Join is a very useful >>> optimization, +1 for the general idea. >>> >>> Regarding the configuration >>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really >>> necessary. The semantic of changelog emitted by the Join operator is >>> eventual consistency, so there is no much difference between original >>> Join and mini-batch Join from this aspect. Besides, introducing more >>> options would make it more complex for users, harder to understand and >>> maintain, which we should be careful about. >>> >>> One thing about the implementation, could you make the new operator >>> share the same state definition with the original one? >>> >>> shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道: >>>> >>>> Hi devs, >>>> >>>> I’d like to start a discussion on FLIP-415: Introduce a new join operator >>>> to support minibatch[1]. >>>> >>>> Currently, when performing cascading connections in Flink, there is a pain >>>> point of record amplification. Every record join operator receives would >>>> trigger join process. However, if records of +I and -D matches , they >>>> could be folded to reduce two times of join process. Besides, records of >>>> -U +U might output 4 records in which two records are redundant when >>>> encountering outer join . >>>> >>>> To address this issue, this FLIP introduces a new >>>> MiniBatchStreamingJoinOperator to achieve batch processing which could >>>> reduce number of outputting redundant messages and avoid unnecessary join >>>> processes. >>>> A new option is added to control the operator to avoid influencing >>>> existing jobs. >>>> >>>> Please find more details in the FLIP wiki document [1]. Looking >>>> forward to your feedback. >>>> >>>> [1] >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch >>>> >>>> Best, >>>> Xu Shuai >>> >>> >>> >>> -- >>> >>> Best, >>> Benchao Li >> > > > -- > > Best, > Benchao Li Best, Xu Shuai