Hi, Xu Shuai. Thanks for driving this flip.

The CDC message amplification of cascade join has always been a problem for 
users. Judging from the 
nexmark results, this optimization is very meaningful. I just have the same 
doubts as Benchao, why can't we 
use minibatch join as the default behavior when the user turns on minibatch?


> Although the semantic of changelog emitted by the Join operator is eventual 
> consistency, the change might
not be supposed for the downstream of the job which requires details of 
changelog.


I think if the user adds the minibatch options to his job to enable minibatch, 
he should know that flink will reduce
the amount of data sent to downstream by folding CDC messages as much as 
possible. In scenarios where all
details of CDC records need to be retained, such as just synchronizing data 
with jobs from one db to another db,
users have no reason to enable minibatch.


The only scenario I can think of that requires adding this independent 
minibatch join option is to ensure that the state
is compatible between multiple versions, but we have not promised users state 
compatibility during cross-version upgrades.


Maybe we need to figure it out why does the 
'table.exec.deduplicate.mini-batch.compact-changes-enabled' option need to
be added to deduplicate operator? I think this is the same reason as adding a 
separate parameter to join to control CDC message folding.




--

    Best!
    Xuyang





在 2024-01-11 16:19:30,"Benchao Li" <libenc...@apache.org> 写道:
>> the change might not be supposed for the downstream of the job which 
>> requires details of changelog
>
>Could you elaborate on this a bit? I've never met such kinds of
>requirements before, I'm curious what is the scenario that requires
>this.
>
>shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道:
>>
>> Thanks for your response, Benchao.
>>
>> Here is my thought on the newly added option.
>> Users' current jobs are running on a version without minibatch join. If the 
>> existing option to enable minibatch join is utilized, then when users' jobs 
>> are migrated to the new version, the internal behavior of the join operation 
>> within the jobs will change. Although the semantic of changelog emitted by 
>> the Join operator is eventual consistency, the change might not be supposed 
>> for the downstream of the job which requires details of changelog. This 
>> newly added option also refers to 
>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
>>
>> As for the implementation,The new operator shares the state of the original 
>> operator and it merely has an additional minibatch for storing records to do 
>> some optimization. The storage remains consistent, and there is minor 
>> modification to the computational logic.
>>
>> Best,
>> Xu Shuai
>>
>> > 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道:
>> >
>> > Thanks shuai for driving this, mini-batch Join is a very useful
>> > optimization, +1 for the general idea.
>> >
>> > Regarding the configuration
>> > "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>> > necessary. The semantic of changelog emitted by the Join operator is
>> > eventual consistency, so there is no much difference between original
>> > Join and mini-batch Join from this aspect. Besides, introducing more
>> > options would make it more complex for users, harder to understand and
>> > maintain, which we should be careful about.
>> >
>> > One thing about the implementation, could you make the new operator
>> > share the same state definition with the original one?
>> >
>> > shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道:
>> >>
>> >> Hi devs,
>> >>
>> >> I’d like to start a discussion on FLIP-415: Introduce a new join operator 
>> >> to support minibatch[1].
>> >>
>> >> Currently, when performing cascading connections in Flink, there is a 
>> >> pain point of record amplification. Every record join operator receives 
>> >> would trigger join process. However, if records of +I and -D matches , 
>> >> they could be folded to reduce two times of join process. Besides, 
>> >> records of  -U +U might output 4 records in which two records are 
>> >> redundant when encountering outer join .
>> >>
>> >> To address this issue, this FLIP introduces a new  
>> >> MiniBatchStreamingJoinOperator to achieve batch processing which could 
>> >> reduce number of outputting redundant messages and avoid unnecessary join 
>> >> processes.
>> >> A new option is added to control the operator to avoid influencing 
>> >> existing jobs.
>> >>
>> >> Please find more details in the FLIP wiki document [1]. Looking
>> >> forward to your feedback.
>> >>
>> >> [1]
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>> >>
>> >> Best,
>> >> Xu Shuai
>> >
>> >
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>>
>
>
>-- 
>
>Best,
>Benchao Li

Reply via email to