Hi, shuai. 
Thanks for this explaination. This scenario sounds reasonable to me. I agree 
that we need to split the behavior
in minibatch into two types of options: 1. Whether to open minibatch to save 
batch data; 2. Whether to compress
the changelog data while saving the batch, and merge the data with the same 
upsert key. For the latter the new option
''table.exec.mini-batch.compact-changes-enabled'' looks pretty good.




--

    Best!
    Xuyang





At 2024-01-12 18:13:12, "shuai xu" <xushuai...@gmail.com> wrote:
>Hi all. 
>
>The point I want to highlight is that minibatch join could potentially yield 
>incomplete changelog which existing jobs are not supposed to be. For example, 
>the scenario that joins two CDC sources after de-duplicating them and the 
>output would be used for audit analysis could not accept incomplete changelog. 
>While the minibatch processing itself would not introduce any problem. 
>
>The internal behavior of minibatch processing is not well-defined now. I don't 
>think reusing the minibatch option for minibatch join is problematic, but 
>precise control is necessary to mitigate the risk of generating incomplete 
>changelog within minibatch. 
>
>Controlling the behavior on changelog within minibatch should be a global 
>option. Therefore, I propose introducing a new option 
>'table.exec.mini-batch.compact-changes-enabled' to precisely control changelog 
>compaction within minibatch. Then we deprecate the option 
>'table.exec.deduplicate.mini-batch.compact-changes-enabled' . The deduplicate 
>operator would fall back to follow the newly introduced option and the 
>minibatch join would follow it as well. 
>
>
>> 2024年1月12日 16:30,Jane Chan <qingyue....@gmail.com> 写道:
>> 
>> Hi shuai,
>> 
>> Thanks for the update! Regarding the newly introduced configuration, I hold
>> the same concern with Benchao and Xuyang.
>> 
>> First of all, in most cases, the fact that users choose to enable
>> mini-batch configuration indicates they are aware of the trade-off between
>> throughput and completeness of the changelog.
>> And if we finally adopt this configuration solely to avoid state
>> incompatibility, does it mean that we will need to introduce a new
>> configuration for every future operator's mini-batch optimization, similar
>> to what we did today?
>> 
>> Best,
>> Jane
>> 
>> On Fri, Jan 12, 2024 at 1:45 PM Xuyang <xyzhong...@163.com> wrote:
>> 
>>> Hi, Xu Shuai. Thanks for driving this flip.
>>> 
>>> 
>>> The CDC message amplification of cascade join has always been a problem
>>> for users. Judging from the
>>> nexmark results, this optimization is very meaningful. I just have the
>>> same doubts as Benchao, why can't we
>>> use minibatch join as the default behavior when the user turns on
>>> minibatch?
>>> 
>>> 
>>>> Although the semantic of changelog emitted by the Join operator is
>>> eventual consistency, the change might
>>> not be supposed for the downstream of the job which requires details of
>>> changelog.
>>> 
>>> 
>>> I think if the user adds the minibatch options to his job to enable
>>> minibatch, he should know that flink will reduce
>>> the amount of data sent to downstream by folding CDC messages as much as
>>> possible. In scenarios where all
>>> details of CDC records need to be retained, such as just synchronizing
>>> data with jobs from one db to another db,
>>> users have no reason to enable minibatch.
>>> 
>>> 
>>> The only scenario I can think of that requires adding this independent
>>> minibatch join option is to ensure that the state
>>> is compatible between multiple versions, but we have not promised users
>>> state compatibility during cross-version upgrades.
>>> 
>>> 
>>> Maybe we need to figure it out why does the
>>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled' option need to
>>> be added to deduplicate operator? I think this is the same reason as
>>> adding a separate parameter to join to control CDC message folding.
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>>    Best!
>>>    Xuyang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2024-01-11 16:19:30,"Benchao Li" <libenc...@apache.org> 写道:
>>>>> the change might not be supposed for the downstream of the job which
>>> requires details of changelog
>>>> 
>>>> Could you elaborate on this a bit? I've never met such kinds of
>>>> requirements before, I'm curious what is the scenario that requires
>>>> this.
>>>> 
>>>> shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道:
>>>>> 
>>>>> Thanks for your response, Benchao.
>>>>> 
>>>>> Here is my thought on the newly added option.
>>>>> Users' current jobs are running on a version without minibatch join. If
>>> the existing option to enable minibatch join is utilized, then when users'
>>> jobs are migrated to the new version, the internal behavior of the join
>>> operation within the jobs will change. Although the semantic of changelog
>>> emitted by the Join operator is eventual consistency, the change might not
>>> be supposed for the downstream of the job which requires details of
>>> changelog. This newly added option also refers to
>>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
>>>>> 
>>>>> As for the implementation,The new operator shares the state of the
>>> original operator and it merely has an additional minibatch for storing
>>> records to do some optimization. The storage remains consistent, and there
>>> is minor modification to the computational logic.
>>>>> 
>>>>> Best,
>>>>> Xu Shuai
>>>>> 
>>>>>> 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道:
>>>>>> 
>>>>>> Thanks shuai for driving this, mini-batch Join is a very useful
>>>>>> optimization, +1 for the general idea.
>>>>>> 
>>>>>> Regarding the configuration
>>>>>> "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>>>>>> necessary. The semantic of changelog emitted by the Join operator is
>>>>>> eventual consistency, so there is no much difference between original
>>>>>> Join and mini-batch Join from this aspect. Besides, introducing more
>>>>>> options would make it more complex for users, harder to understand and
>>>>>> maintain, which we should be careful about.
>>>>>> 
>>>>>> One thing about the implementation, could you make the new operator
>>>>>> share the same state definition with the original one?
>>>>>> 
>>>>>> shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道:
>>>>>>> 
>>>>>>> Hi devs,
>>>>>>> 
>>>>>>> I’d like to start a discussion on FLIP-415: Introduce a new join
>>> operator to support minibatch[1].
>>>>>>> 
>>>>>>> Currently, when performing cascading connections in Flink, there is
>>> a pain point of record amplification. Every record join operator receives
>>> would trigger join process. However, if records of +I and -D matches , they
>>> could be folded to reduce two times of join process. Besides, records of
>>> -U +U might output 4 records in which two records are redundant when
>>> encountering outer join .
>>>>>>> 
>>>>>>> To address this issue, this FLIP introduces a new
>>> MiniBatchStreamingJoinOperator to achieve batch processing which could
>>> reduce number of outputting redundant messages and avoid unnecessary join
>>> processes.
>>>>>>> A new option is added to control the operator to avoid influencing
>>> existing jobs.
>>>>>>> 
>>>>>>> Please find more details in the FLIP wiki document [1]. Looking
>>>>>>> forward to your feedback.
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
>>>>>>> 
>>>>>>> Best,
>>>>>>> Xu Shuai
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> 
>>>>>> Best,
>>>>>> Benchao Li
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> 
>>>> Best,
>>>> Benchao Li
>>> 
>
>Best,
>Xu Shuai
>

Reply via email to