Hi, shuai. Thanks for this explaination. This scenario sounds reasonable to me. I agree that we need to split the behavior in minibatch into two types of options: 1. Whether to open minibatch to save batch data; 2. Whether to compress the changelog data while saving the batch, and merge the data with the same upsert key. For the latter the new option ''table.exec.mini-batch.compact-changes-enabled'' looks pretty good.
-- Best! Xuyang At 2024-01-12 18:13:12, "shuai xu" <xushuai...@gmail.com> wrote: >Hi all. > >The point I want to highlight is that minibatch join could potentially yield >incomplete changelog which existing jobs are not supposed to be. For example, >the scenario that joins two CDC sources after de-duplicating them and the >output would be used for audit analysis could not accept incomplete changelog. >While the minibatch processing itself would not introduce any problem. > >The internal behavior of minibatch processing is not well-defined now. I don't >think reusing the minibatch option for minibatch join is problematic, but >precise control is necessary to mitigate the risk of generating incomplete >changelog within minibatch. > >Controlling the behavior on changelog within minibatch should be a global >option. Therefore, I propose introducing a new option >'table.exec.mini-batch.compact-changes-enabled' to precisely control changelog >compaction within minibatch. Then we deprecate the option >'table.exec.deduplicate.mini-batch.compact-changes-enabled' . The deduplicate >operator would fall back to follow the newly introduced option and the >minibatch join would follow it as well. > > >> 2024年1月12日 16:30,Jane Chan <qingyue....@gmail.com> 写道: >> >> Hi shuai, >> >> Thanks for the update! Regarding the newly introduced configuration, I hold >> the same concern with Benchao and Xuyang. >> >> First of all, in most cases, the fact that users choose to enable >> mini-batch configuration indicates they are aware of the trade-off between >> throughput and completeness of the changelog. >> And if we finally adopt this configuration solely to avoid state >> incompatibility, does it mean that we will need to introduce a new >> configuration for every future operator's mini-batch optimization, similar >> to what we did today? >> >> Best, >> Jane >> >> On Fri, Jan 12, 2024 at 1:45 PM Xuyang <xyzhong...@163.com> wrote: >> >>> Hi, Xu Shuai. Thanks for driving this flip. >>> >>> >>> The CDC message amplification of cascade join has always been a problem >>> for users. Judging from the >>> nexmark results, this optimization is very meaningful. I just have the >>> same doubts as Benchao, why can't we >>> use minibatch join as the default behavior when the user turns on >>> minibatch? >>> >>> >>>> Although the semantic of changelog emitted by the Join operator is >>> eventual consistency, the change might >>> not be supposed for the downstream of the job which requires details of >>> changelog. >>> >>> >>> I think if the user adds the minibatch options to his job to enable >>> minibatch, he should know that flink will reduce >>> the amount of data sent to downstream by folding CDC messages as much as >>> possible. In scenarios where all >>> details of CDC records need to be retained, such as just synchronizing >>> data with jobs from one db to another db, >>> users have no reason to enable minibatch. >>> >>> >>> The only scenario I can think of that requires adding this independent >>> minibatch join option is to ensure that the state >>> is compatible between multiple versions, but we have not promised users >>> state compatibility during cross-version upgrades. >>> >>> >>> Maybe we need to figure it out why does the >>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled' option need to >>> be added to deduplicate operator? I think this is the same reason as >>> adding a separate parameter to join to control CDC message folding. >>> >>> >>> >>> >>> -- >>> >>> Best! >>> Xuyang >>> >>> >>> >>> >>> >>> 在 2024-01-11 16:19:30,"Benchao Li" <libenc...@apache.org> 写道: >>>>> the change might not be supposed for the downstream of the job which >>> requires details of changelog >>>> >>>> Could you elaborate on this a bit? I've never met such kinds of >>>> requirements before, I'm curious what is the scenario that requires >>>> this. >>>> >>>> shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道: >>>>> >>>>> Thanks for your response, Benchao. >>>>> >>>>> Here is my thought on the newly added option. >>>>> Users' current jobs are running on a version without minibatch join. If >>> the existing option to enable minibatch join is utilized, then when users' >>> jobs are migrated to the new version, the internal behavior of the join >>> operation within the jobs will change. Although the semantic of changelog >>> emitted by the Join operator is eventual consistency, the change might not >>> be supposed for the downstream of the job which requires details of >>> changelog. This newly added option also refers to >>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'. >>>>> >>>>> As for the implementation,The new operator shares the state of the >>> original operator and it merely has an additional minibatch for storing >>> records to do some optimization. The storage remains consistent, and there >>> is minor modification to the computational logic. >>>>> >>>>> Best, >>>>> Xu Shuai >>>>> >>>>>> 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道: >>>>>> >>>>>> Thanks shuai for driving this, mini-batch Join is a very useful >>>>>> optimization, +1 for the general idea. >>>>>> >>>>>> Regarding the configuration >>>>>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really >>>>>> necessary. The semantic of changelog emitted by the Join operator is >>>>>> eventual consistency, so there is no much difference between original >>>>>> Join and mini-batch Join from this aspect. Besides, introducing more >>>>>> options would make it more complex for users, harder to understand and >>>>>> maintain, which we should be careful about. >>>>>> >>>>>> One thing about the implementation, could you make the new operator >>>>>> share the same state definition with the original one? >>>>>> >>>>>> shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道: >>>>>>> >>>>>>> Hi devs, >>>>>>> >>>>>>> I’d like to start a discussion on FLIP-415: Introduce a new join >>> operator to support minibatch[1]. >>>>>>> >>>>>>> Currently, when performing cascading connections in Flink, there is >>> a pain point of record amplification. Every record join operator receives >>> would trigger join process. However, if records of +I and -D matches , they >>> could be folded to reduce two times of join process. Besides, records of >>> -U +U might output 4 records in which two records are redundant when >>> encountering outer join . >>>>>>> >>>>>>> To address this issue, this FLIP introduces a new >>> MiniBatchStreamingJoinOperator to achieve batch processing which could >>> reduce number of outputting redundant messages and avoid unnecessary join >>> processes. >>>>>>> A new option is added to control the operator to avoid influencing >>> existing jobs. >>>>>>> >>>>>>> Please find more details in the FLIP wiki document [1]. Looking >>>>>>> forward to your feedback. >>>>>>> >>>>>>> [1] >>>>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch >>>>>>> >>>>>>> Best, >>>>>>> Xu Shuai >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Best, >>>>>> Benchao Li >>>>> >>>> >>>> >>>> -- >>>> >>>> Best, >>>> Benchao Li >>> > >Best, >Xu Shuai >