Hi shuai,

Thanks for the update! Regarding the newly introduced configuration, I hold
the same concern with Benchao and Xuyang.

First of all, in most cases, the fact that users choose to enable
mini-batch configuration indicates they are aware of the trade-off between
throughput and completeness of the changelog.
And if we finally adopt this configuration solely to avoid state
incompatibility, does it mean that we will need to introduce a new
configuration for every future operator's mini-batch optimization, similar
to what we did today?

Best,
Jane

On Fri, Jan 12, 2024 at 1:45 PM Xuyang <xyzhong...@163.com> wrote:

> Hi, Xu Shuai. Thanks for driving this flip.
>
>
> The CDC message amplification of cascade join has always been a problem
> for users. Judging from the
> nexmark results, this optimization is very meaningful. I just have the
> same doubts as Benchao, why can't we
> use minibatch join as the default behavior when the user turns on
> minibatch?
>
>
> > Although the semantic of changelog emitted by the Join operator is
> eventual consistency, the change might
> not be supposed for the downstream of the job which requires details of
> changelog.
>
>
> I think if the user adds the minibatch options to his job to enable
> minibatch, he should know that flink will reduce
> the amount of data sent to downstream by folding CDC messages as much as
> possible. In scenarios where all
> details of CDC records need to be retained, such as just synchronizing
> data with jobs from one db to another db,
> users have no reason to enable minibatch.
>
>
> The only scenario I can think of that requires adding this independent
> minibatch join option is to ensure that the state
> is compatible between multiple versions, but we have not promised users
> state compatibility during cross-version upgrades.
>
>
> Maybe we need to figure it out why does the
> 'table.exec.deduplicate.mini-batch.compact-changes-enabled' option need to
> be added to deduplicate operator? I think this is the same reason as
> adding a separate parameter to join to control CDC message folding.
>
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
>
>
> 在 2024-01-11 16:19:30,"Benchao Li" <libenc...@apache.org> 写道:
> >> the change might not be supposed for the downstream of the job which
> requires details of changelog
> >
> >Could you elaborate on this a bit? I've never met such kinds of
> >requirements before, I'm curious what is the scenario that requires
> >this.
> >
> >shuai xu <xushuai...@gmail.com> 于2024年1月11日周四 13:08写道:
> >>
> >> Thanks for your response, Benchao.
> >>
> >> Here is my thought on the newly added option.
> >> Users' current jobs are running on a version without minibatch join. If
> the existing option to enable minibatch join is utilized, then when users'
> jobs are migrated to the new version, the internal behavior of the join
> operation within the jobs will change. Although the semantic of changelog
> emitted by the Join operator is eventual consistency, the change might not
> be supposed for the downstream of the job which requires details of
> changelog. This newly added option also refers to
> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
> >>
> >> As for the implementation,The new operator shares the state of the
> original operator and it merely has an additional minibatch for storing
> records to do some optimization. The storage remains consistent, and there
> is minor modification to the computational logic.
> >>
> >> Best,
> >> Xu Shuai
> >>
> >> > 2024年1月10日 22:56,Benchao Li <libenc...@apache.org> 写道:
> >> >
> >> > Thanks shuai for driving this, mini-batch Join is a very useful
> >> > optimization, +1 for the general idea.
> >> >
> >> > Regarding the configuration
> >> > "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
> >> > necessary. The semantic of changelog emitted by the Join operator is
> >> > eventual consistency, so there is no much difference between original
> >> > Join and mini-batch Join from this aspect. Besides, introducing more
> >> > options would make it more complex for users, harder to understand and
> >> > maintain, which we should be careful about.
> >> >
> >> > One thing about the implementation, could you make the new operator
> >> > share the same state definition with the original one?
> >> >
> >> > shuai xu <xushuai...@gmail.com> 于2024年1月10日周三 21:23写道:
> >> >>
> >> >> Hi devs,
> >> >>
> >> >> I’d like to start a discussion on FLIP-415: Introduce a new join
> operator to support minibatch[1].
> >> >>
> >> >> Currently, when performing cascading connections in Flink, there is
> a pain point of record amplification. Every record join operator receives
> would trigger join process. However, if records of +I and -D matches , they
> could be folded to reduce two times of join process. Besides, records of
> -U +U might output 4 records in which two records are redundant when
> encountering outer join .
> >> >>
> >> >> To address this issue, this FLIP introduces a new
> MiniBatchStreamingJoinOperator to achieve batch processing which could
> reduce number of outputting redundant messages and avoid unnecessary join
> processes.
> >> >> A new option is added to control the operator to avoid influencing
> existing jobs.
> >> >>
> >> >> Please find more details in the FLIP wiki document [1]. Looking
> >> >> forward to your feedback.
> >> >>
> >> >> [1]
> >> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415%3A+Introduce+a+new+join+operator+to+support+minibatch
> >> >>
> >> >> Best,
> >> >> Xu Shuai
> >> >
> >> >
> >> >
> >> > --
> >> >
> >> > Best,
> >> > Benchao Li
> >>
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>

Reply via email to