Suppose we currently have a job that joins two CDC sources after de-duplicating 
them and the output is available for audit analysis, and the user turns off the 
parameter "table.exec.deduplicate.mini-batch.compact-changes-enabled" to ensure 
that it does not lose update details. If we don't introduce this parameter, 
after the user upgrades the version, some update details may be lost due to the 
mini-batch connection being enabled by default, resulting in distorted audit 
results.

> 2024年1月11日 16:19,Benchao Li <libenc...@apache.org> 写道:
> 
>> the change might not be supposed for the downstream of the job which 
>> requires details of changelog
> 
> Could you elaborate on this a bit? I've never met such kinds of
> requirements before, I'm curious what is the scenario that requires
> this.
> 
> shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道:
>> 
>> Thanks for your response, Benchao.
>> 
>> Here is my thought on the newly added option.
>> Users' current jobs are running on a version without minibatch join. If the 
>> existing option to enable minibatch join is utilized, then when users' jobs 
>> are migrated to the new version, the internal behavior of the join operation 
>> within the jobs will change. Although the semantic of changelog emitted by 
>> the Join operator is eventual consistency, the change might not be supposed 
>> for the downstream of the job which requires details of changelog. This 
>> newly added option also refers to 
>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
>> 
>> As for the implementation,The new operator shares the state of the original 
>> operator and it merely has an additional minibatch for storing records to do 
>> some optimization. The storage remains consistent, and there is minor 
>> modification to the computational logic.
>> 
>> Best,
>> Xu Shuai
>> 
>>> 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道:
>>> 
>>> Thanks shuai for driving this, mini-batch Join is a very useful
>>> optimization, +1 for the general idea.
>>> 
>>> Regarding the configuration
>>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>>> necessary. The semantic of changelog emitted by the Join operator is
>>> eventual consistency, so there is no much difference between original
>>> Join and mini-batch Join from this aspect. Besides, introducing more
>>> options would make it more complex for users, harder to understand and
>>> maintain, which we should be careful about.
>>> 
>>> One thing about the implementation, could you make the new operator
>>> share the same state definition with the original one?
>>> 
>>> shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道:
>>>> 
>>>> Hi devs,
>>>> 
>>>> I’d like to start a discussion on FLIP-415: Introduce a new join operator 
>>>> to support minibatch[1].
>>>> 
>>>> Currently, when performing cascading connections in Flink, there is a pain 
>>>> point of record amplification. Every record join operator receives would 
>>>> trigger join process. However, if records of +I and -D matches , they 
>>>> could be folded to reduce two times of join process. Besides, records of  
>>>> -U +U might output 4 records in which two records are redundant when 
>>>> encountering outer join .
>>>> 
>>>> To address this issue, this FLIP introduces a new  
>>>> MiniBatchStreamingJoinOperator to achieve batch processing which could 
>>>> reduce number of outputting redundant messages and avoid unnecessary join 
>>>> processes.
>>>> A new option is added to control the operator to avoid influencing 
>>>> existing jobs.
>>>> 
>>>> Please find more details in the FLIP wiki document [1]. Looking
>>>> forward to your feedback.
>>>> 
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>>>> 
>>>> Best,
>>>> Xu Shuai
>>> 
>>> 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>> 
> 
> 
> -- 
> 
> Best,
> Benchao Li

Best, 
Xu Shuai

Reply via email to