What's the status and conclusion of this discussion?

I have seen the value of exposing OperatorCoordinator because of the
powerful RPC calls,
some projects are already using it, such as Hudi[1]. But I agree this is a
large topic and
requires another FLIP.

I am also concerned about extracting a Public base class without
implementations, and
clear usage is easy to break in the future. However, I think the shuffling
operator can be a
generic component used by other connectors and DataStream jobs.

Have you considered contributing the ShuffleOperator to the Flink main
repository as a
part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to
extract the common
part between SourceCoordinatorContext and ShuffleCoordinatorContext in a
single repository
 as an internal implementation.


Best,
Jark

[1]:
https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java

On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pnowoj...@apache.org> wrote:

> Ohhh, I was confused. I thought that the proposal is to make
> `CoordinatorContextBase` part of the public API.
>
> However, I'm also against extracting `CoordinatorContextBase` as an
> `@Internal` class as well.
>
> 1. Connectors shouldn't reuse internal classes. Using `@Internal`
> CoordinatedOperatorFactory would be already quite bad, but at least this is
> a relatively stable internal API. Using `@Internal`
> `@CoordinatorContextBase`, and refactoring out this base class just for the
> sake of re-using it in a connector is IMO even worse.
> 2. Double so if they are in a separate repository (as the iceberg connector
> will be/is, right?). There would be no way to prevent breaking changes
> between repositories.
>
> If that's only intended as the stop-gap solution until we properly expose
> coordinators, the lesser evil would be IMO to copy/paste/modify
> SourceCoordinatorContext to the flink-connector-iceberg repository.
>
> Best,
> Piotrek
>
> czw., 3 lis 2022 o 12:51 Maximilian Michels <m...@apache.org> napisał(a):
>
> > +1 If we wanted to expose the OperatorCoordinator API, we should provide
> > an adequate interface. The FLIP partially addresses this by trying to
> > factor out RPC code which other coordinators might make use of, but there
> > is additional design necessary to realize a public operator API.
> >
> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I
> > think they make sense in the context of an Iceberg ShuffleCoordinator in
> > Flink. If we were to add such a new coordinator, feel free to make the
> > proposed code refactoring as part of a pull request. A FLIP isn't
> strictly
> > necessary here because this is a purely internal change which does not
> > alter public APIs, nor does it alter the internal architecture, apart
> from
> > reusing a bit of existing code. I'm sorry if we consumed some of your
> time
> > revising the document but I think we had a healthy discussion here. And
> > we're definitely looking forward to seeing some of these code changes!
> >
> > -Max
> >
> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> Sorry for the delay, but I've given more thoughts into this. First I
> >> share the same thoughts as Maximilian, that this FLIP is incomplete. As
> I
> >> understand it, you are trying to hack existing code to expose small
> bits of
> >> internal functionalities as part of the public API without solving many
> of
> >> the underlying issues.
> >>
> >> For example, what's the point of exposing `CoordinatorContextBase` as a
> >> public API if users can not use it? After all, the `OperatorCoordinator`
> >> and `CoordinatedOperatorFactory` would remain internal. At the same
> time,
> >> this FLIP would officially force us to support and maintain this
> >> CoordinatorContextBase, while I have strong feelings that we don't want
> to
> >> do that in the long term. I think we would need to take a big step back
> and
> >> first discuss how we would like to expose the coordinators and agree
> how to
> >> deal with the issues.
> >>
> >> First big issue that I see is that I would feel very worried exposing
> >> coordinator API without at least designing/planning how to deal with
> >> checkpointing their state. Without that, I'm afraid we might end up in a
> >> situation where we need to break the API in order to properly support
> >> stateful coordinators. And at the moment I don't see a good and easy
> >> solution to this problem.
> >>
> >> Second issue is the shape of the exposed public API. Exposing
> >> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like a
> bad
> >> design, that would expose way too many things to the users, making
> future
> >> development more complicated for us and making implementation of those
> >> interfaces by the user unnecessary difficult. I see this as a similar
> issue
> >> as the low level `StreamOperator` API vs the higher level
> >> `org.apache.flink.api.common.functions.Function` API. (instead of
> exposing
> >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up the
> >> `ProcessFunction` to expose all of the remaining functionalities in a
> >> user-friendly way). In the context of the coordinators, I would say
> that we
> >> should expose as the public API not the `OperatorCoordinator`, but for
> >> example some kind of an `EventProcessFunction` that would have a simple
> >> interface like:
> >> ```
> >> interface EventProcessFunction {
> >>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
> >> eventDispatcher);
> >> }
> >> ```
> >> + maybe some features like processing time timers/mailbox style async
> >> actions.
> >> (or maybe that could have been just a regular `ProcessFunction` but with
> >> `OperatorEvent` with `int subtask` as input/output).
> >>
> >> Best,
> >> Piotrek
> >>
> >> śr., 2 lis 2022 o 19:40 gang ye <yegang...@gmail.com> napisał(a):
> >>
> >>> Hi Max and Qingsheng,
> >>>
> >>> Thanks for the feedback. The initial motivation to propose this is to
> >>> reduce the duplicated code since ShuffleCoordinator would need similar
> >>> communication logic as SourceCoordinator to talk with operators. I
> >>> understand your concern that OperatorCoordinator is an internal class
> and
> >>> except SourceCoordinator for now no other uses this.
> >>> How about let's do it like what Qingsheng said? I can go ahead with the
> >>> ShufflingCoordinator implementation without the extraction. Then we
> have
> >>> intuitive sense of how many codes are copied and can be reused. If we
> feel
> >>> that there is still a need to extract, we can revisit the discussion.
> >>>
> >>> Thanks
> >>> Gang
> >>>
> >>>
> >>>
> >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org>
> wrote:
> >>>
> >>>> Thanks Gang and Steven for the FLIP. Actually I share the same concern
> >>>> with Piotr and Maximilian.
> >>>>
> >>>> OperatorCoordinator is marked as @Internal intentionally considering
> >>>> some existing issues, like consistency between non-source operator and
> >>>> coordinator on checkpoint. I'm wondering if it is useful to expose a
> public
> >>>> context to developers but have the OperatorCoordinator as an internal
> API.
> >>>> If we finally close all issues and decide to expose the operator
> >>>> coordinator API, it would be a better chance to include the base
> context as
> >>>> a part of it.
> >>>>
> >>>> Best,
> >>>> Qingsheng
> >>>>
> >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <m...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Thanks Steven! My confusion stemmed from the lack of context in the
> >>>>> FLIP.
> >>>>> The first version did not lay out how the refactoring would be used
> >>>>> down
> >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API
> >>>>> is a
> >>>>> non-public API and before reading the code, I wasn't even aware how
> >>>>> exactly
> >>>>> it worked and whether it would be available to regular operators (it
> >>>>> was
> >>>>> originally intended for sources only).
> >>>>>
> >>>>> I might seem pedantic here but I believe the purpose of a FLIP should
> >>>>> be to
> >>>>> describe the *why* behind the changes, not only the changes itself. A
> >>>>> FLIP
> >>>>> is not a formality but is a tool to communicate and discuss changes.
> I
> >>>>> think we still haven't laid out the exact reasons why we are
> factoring
> >>>>> out
> >>>>> the base. As far as I understand now, we need the base class to deal
> >>>>> with
> >>>>> concurrent updates in the custom Coordinator from the runtime
> >>>>> (sub)tasks.
> >>>>> Effectively, we are enforcing an actor model for the processing of
> the
> >>>>> incoming messages such that the OperatorCoordinator can cleanly
> update
> >>>>> its
> >>>>> state. However, if there are no actual implementations that make use
> >>>>> of the
> >>>>> refactoring in Flink itself, I wonder if it would make sense to copy
> >>>>> this
> >>>>> code to the downstream implementation, e.g. the ShuffleCoordinator.
> As
> >>>>> soon
> >>>>> as it is part of Flink, we could of course try to consolidate this
> >>>>> code.
> >>>>>
> >>>>> Considering the *how* of this, there appear to be both methods from
> >>>>> SourceCoordinator (e.g. runInEventLoop) as well as
> >>>>> SourceCoordinatorContext
> >>>>> listed in the FLIP, as well as methods which do not appear anywhere
> in
> >>>>> Flink code, e.g. subTaskReady / subTaskNotReady /
> sendEventToOperator.
> >>>>> It
> >>>>> appears that some of this has been extracted from a downstream
> >>>>> implementation. It would be great to adjust this, such that it
> >>>>> reflects the
> >>>>> status quo in Flink.
> >>>>>
> >>>>> -Max
> >>>>>
> >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>> > Max,
> >>>>> >
> >>>>> > Thanks a lot for the comments. We should clarify that the shuffle
> >>>>> > operator/coordinator is not really part of the Flink sink
> >>>>> > function/operator. shuffle operator is a custom operator that can
> be
> >>>>> > inserted right before the Iceberg writer operator. Shuffle operator
> >>>>> > calculates the traffic statistics and performs a custom
> >>>>> partition/shuffle
> >>>>> > (DataStream#partitionCustom) to cluster the data right before they
> >>>>> get to
> >>>>> > the Iceberg writer operator.
> >>>>> >
> >>>>> > We are not proposing to introduce a sink coordinator for the sink
> >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
> >>>>> > facilitate the communication btw shuffle subtasks and coordinator
> for
> >>>>> > traffic statistics aggregation. The communication part is already
> >>>>> > implemented by SourceCoordinatorContext.
> >>>>> >
> >>>>> > Here are some details about the communication needs.
> >>>>> > - subtasks periodically calculate local statistics and send to the
> >>>>> > coordinator for global aggregation
> >>>>> > - the coordinator sends the globally aggregated statistics to the
> >>>>> subtasks
> >>>>> > - subtasks use the globally aggregated statistics to guide the
> >>>>> > partition/shuffle decision
> >>>>> >
> >>>>> > Regards,
> >>>>> > Steven
> >>>>> >
> >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <m...@apache.org
> >
> >>>>> wrote:
> >>>>> >
> >>>>> > > Hi Gang,
> >>>>> > >
> >>>>> > > Looks much better! I've actually gone through the
> >>>>> OperatorCoordinator
> >>>>> > code.
> >>>>> > > It turns out, any operator already has an OperatorCoordinator
> >>>>> assigned.
> >>>>> > > Also, any operator can add custom coordinator code. So it looks
> >>>>> like you
> >>>>> > > won't have to implement any additional runtime logic to add a
> >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you
> >>>>> specifically need
> >>>>> > to
> >>>>> > > refactor the SourceCoordinatorContext? You could simply add your
> >>>>> own
> >>>>> > > coordinator code. I'm not sure the sink requirements map to the
> >>>>> source
> >>>>> > > interface so closely that you can reuse the same logic.
> >>>>> > >
> >>>>> > > If you can refactor SourceCoordinatorContext in a way that makes
> >>>>> it fit
> >>>>> > > your use case, I have nothing to object here. By the way, another
> >>>>> example
> >>>>> > > of an existing OperatorCoordinator is
> >>>>> CollectSinkOperatorCoordinator
> >>>>> > which
> >>>>> > > is quite trivial but it might be worth evaluating whether you
> need
> >>>>> the
> >>>>> > full
> >>>>> > > power of SourceCoordinatorContext which is why I wanted to get
> more
> >>>>> > > context.
> >>>>> > >
> >>>>> > > -Max
> >>>>> > >
> >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com>
> >>>>> wrote:
> >>>>> > >
> >>>>> > > > Hi Max,
> >>>>> > > > I got your concern. Since shuffling support for Flink Iceberg
> >>>>> sink is
> >>>>> > not
> >>>>> > > > the main body of the proposal, I add another appendix part just
> >>>>> now
> >>>>> > with
> >>>>> > > > more details about how to use CoordinatorContextBase and how to
> >>>>> define
> >>>>> > > > ShufflingCoordinator.
> >>>>> > > >
> >>>>> > > > Let me know if that cannot solve your concern.
> >>>>> > > >
> >>>>> > > > Thanks
> >>>>> > > > Gang
> >>>>> > > >
> >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
> >>>>> m...@apache.org>
> >>>>> > > wrote:
> >>>>> > > >
> >>>>> > > >> Hey Gang,
> >>>>> > > >>
> >>>>> > > >> What I'm looking for here is a complete picture of why the
> >>>>> change is
> >>>>> > > >> necessary and what the next steps are. Ultimately, refactoring
> >>>>> any
> >>>>> > code
> >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator
> >>>>> code such
> >>>>> > > that
> >>>>> > > >> we can add a SinkCoordinator, similar to the
> SourceCoordinator.
> >>>>> The
> >>>>> > FLIP
> >>>>> > > >> should address the next steps, i.e. how you plan to add the
> >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't
> >>>>> have to
> >>>>> > be
> >>>>> > > in
> >>>>> > > >> great detail but without this information, I don't think the
> >>>>> FLIP is
> >>>>> > > >> complete.
> >>>>> > > >>
> >>>>> > > >> This feature should be generic enough to be usable by other
> >>>>> sinks than
> >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
> >>>>> > > implementation
> >>>>> > > >> which may be outlined in a separate FLIP.
> >>>>> > > >>
> >>>>> > > >> Unless there is a good reason, normal operators should not
> >>>>> support the
> >>>>> > > >> coordinator functionality. At least I'm not convinced it would
> >>>>> play
> >>>>> > well
> >>>>> > > >> with Flink's execution model. But I see how it is required for
> >>>>> sources
> >>>>> > > and
> >>>>> > > >> sinks.
> >>>>> > > >>
> >>>>> > > >> -Max
> >>>>> > > >>
> >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegang...@gmail.com>
> >>>>> wrote:
> >>>>> > > >>
> >>>>> > > >>> Hi Max,
> >>>>> > > >>>
> >>>>> > > >>> Thanks for reviewing.
> >>>>> > > >>>
> >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC
> >>>>> calls
> >>>>> > > >>> between the task and the job manager for communications and
> >>>>> won't
> >>>>> > touch
> >>>>> > > >>> watermark checkpoint.
> >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
> >>>>> subtasks, then
> >>>>> > > it
> >>>>> > > >>> can define context without extending from the
> >>>>> > CoordinatorContextBase(or
> >>>>> > > >>> find another class name to limit the scope).
> >>>>> > > >>>
> >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we will
> >>>>> only do
> >>>>> > > >>> context extraction. The shuffling coordinator and operator
> >>>>> > > >>> <
> >>>>> > >
> >>>>> >
> >>>>>
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> >>>>> > > >
> >>>>> > > >>> which will use the context will come with a separate
> proposal,
> >>>>> thus
> >>>>> > we
> >>>>> > > try
> >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a
> >>>>> little bit
> >>>>> > > more
> >>>>> > > >>> about how to use the coordinator context in Flip 264 if you
> >>>>> think
> >>>>> > that
> >>>>> > > will
> >>>>> > > >>> be helpful.
> >>>>> > > >>>
> >>>>> > > >>> Thanks!
> >>>>> > > >>> Gang
> >>>>> > > >>>
> >>>>> > > >>>
> >>>>> > > >>>
> >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
> >>>>> m...@apache.org>
> >>>>> > > >>> wrote:
> >>>>> > > >>>
> >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a
> >>>>> bigger
> >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
> >>>>> > > specifically
> >>>>> > > >>>> added to synchronize the global watermark and to assign
> splits
> >>>>> > > dynamically.
> >>>>> > > >>>> However, it practically allows arbitrary RPC calls between
> >>>>> the task
> >>>>> > > and the
> >>>>> > > >>>> job manager. I understand that there is concern that such a
> >>>>> powerful
> >>>>> > > >>>> mechanism should not be available to all operators.
> >>>>> Nevertheless, I
> >>>>> > > see the
> >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
> >>>>> limiting
> >>>>> > > this
> >>>>> > > >>>> feature to sinks (and sources) only.
> >>>>> > > >>>>
> >>>>> > > >>>> I'm wondering whether extracting the
> SourceCoordinatorContext
> >>>>> is
> >>>>> > > >>>> enough to achieve what you want. There will be additional
> work
> >>>>> > > necessary,
> >>>>> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator
> >>>>> which
> >>>>> > > handles
> >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be
> good
> >>>>> to
> >>>>> > > outline
> >>>>> > > >>>> this in the FLIP.
> >>>>> > > >>>>
> >>>>> > > >>>> -Max
> >>>>> > > >>>>
> >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
> >>>>> stevenz...@gmail.com>
> >>>>> > > wrote:
> >>>>> > > >>>>
> >>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
> >>>>> > > >>>>>
> >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In the
> >>>>> > > FLINK-27405
> >>>>> > > >>>>> [1],
> >>>>> > > >>>>> Avid pointed out some implications regarding checkpointing.
> >>>>> In this
> >>>>> > > >>>>> small
> >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic,
> >>>>> we
> >>>>> > mainly
> >>>>> > > >>>>> need
> >>>>> > > >>>>> the coordinator context functionality to facilitate the
> >>>>> > communication
> >>>>> > > >>>>> between coordinator and subtasks.
> >>>>> > > >>>>>
> >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
> >>>>> > > >>>>>
> >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
> >>>>> stevenz...@gmail.com>
> >>>>> > > >>>>> wrote:
> >>>>> > > >>>>>
> >>>>> > > >>>>> > Hang, appreciate your input. Agree that
> >>>>> `CoordinatorContextBase`
> >>>>> > > is a
> >>>>> > > >>>>> > better name considering Flink code convention.
> >>>>> > > >>>>> >
> >>>>> > > >>>>> > If there are any concrete concerns, we can discuss. In
> the
> >>>>> jira,
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >
> >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
> >>>>> > ruanhang1...@gmail.com
> >>>>> > > >
> >>>>> > > >>>>> wrote:
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >> Hi,
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> IMP, I agree to extract a base class for
> >>>>> > SourceCoordinatorContext.
> >>>>> > > >>>>> >> But I prefer to use the name
> >>>>> `OperatorCoordinatorContextBase` or
> >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
> >>>>> `SourceReaderBase`.
> >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems
> will
> >>>>> occur
> >>>>> > > when
> >>>>> > > >>>>> >> connectors start to use it.
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> Best,
> >>>>> > > >>>>> >> Hang
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五
> 22:31写道:
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> > Piotr,
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > The proposal is to extract the listed methods from
> >>>>> @Iinternal
> >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
> >>>>> > > >>>>> BaseCoordinatorContext.
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > The motivation is that other operators can leverage
> the
> >>>>> > > >>>>> communication
> >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
> >>>>> subtasks. For
> >>>>> > > >>>>> example,
> >>>>> > > >>>>> >> in
> >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink
> >>>>> Iceberg sink)
> >>>>> > > can
> >>>>> > > >>>>> >> leverage
> >>>>> > > >>>>> >> > it for computing traffic distribution statistics.
> >>>>> > > >>>>> >> > * subtasks calculate local statistics and periodically
> >>>>> send
> >>>>> > them
> >>>>> > > >>>>> to the
> >>>>> > > >>>>> >> > coordinator for global aggregation.
> >>>>> > > >>>>> >> > * The coordinator can broadcast the globally
> aggregated
> >>>>> > > >>>>> statistics to
> >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
> >>>>> decision
> >>>>> > > >>>>> (selecting
> >>>>> > > >>>>> >> > downstream channels).
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > Thanks,
> >>>>> > > >>>>> >> > Steven
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
> >>>>> > > >>>>> pnowoj...@apache.org>
> >>>>> > > >>>>> >> > wrote:
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > > Hi,
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you have
> >>>>> in mind?
> >>>>> > > >>>>> From the
> >>>>> > > >>>>> >> > > context I would understand that the newly extracted
> >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
> >>>>> > > >>>>> `@PublicEvolving`
> >>>>> > > >>>>> >> or
> >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
> >>>>> keeping
> >>>>> > > >>>>> `@Internal`
> >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
> >>>>> could have
> >>>>> > > >>>>> been
> >>>>> > > >>>>> >> removed
> >>>>> > > >>>>> >> > > at any point of time in the future. Having said
> that,
> >>>>> it
> >>>>> > > sounds
> >>>>> > > >>>>> to me
> >>>>> > > >>>>> >> > like
> >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the
> >>>>> first
> >>>>> > > glance
> >>>>> > > >>>>> and
> >>>>> > > >>>>> >> you
> >>>>> > > >>>>> >> > > actually want to expose the operator coordinator
> >>>>> concept to
> >>>>> > > the
> >>>>> > > >>>>> public
> >>>>> > > >>>>> >> > API?
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > AFAIK there were some discussions about that, and it
> >>>>> was a
> >>>>> > bit
> >>>>> > > >>>>> of a
> >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know
> those
> >>>>> > reasons
> >>>>> > > >>>>> however.
> >>>>> > > >>>>> >> > Only
> >>>>> > > >>>>> >> > > now, I've just heard that there are for example some
> >>>>> > problems
> >>>>> > > >>>>> with
> >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
> >>>>> > > coordinators.
> >>>>> > > >>>>> Maybe
> >>>>> > > >>>>> >> > > someone else could shed some light on this?
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of
> exposing
> >>>>> > > operator
> >>>>> > > >>>>> >> > > coordinators if there is a good reason behind that,
> >>>>> but it
> >>>>> > is
> >>>>> > > a
> >>>>> > > >>>>> more
> >>>>> > > >>>>> >> > > difficult topic and might be a larger effort than it
> >>>>> seems
> >>>>> > at
> >>>>> > > >>>>> the
> >>>>> > > >>>>> >> first
> >>>>> > > >>>>> >> > > glance.
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > Best,
> >>>>> > > >>>>> >> > > Piotrek
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
> >>>>> stevenz...@gmail.com>
> >>>>> > > >>>>> napisał(a):
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked
> >>>>> google doc
> >>>>> > is
> >>>>> > > >>>>> not for
> >>>>> > > >>>>> >> > this
> >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page.
> >>>>> The
> >>>>> > linked
> >>>>> > > >>>>> google
> >>>>> > > >>>>> >> doc
> >>>>> > > >>>>> >> > > is
> >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
> >>>>> Iceberg
> >>>>> > sink,
> >>>>> > > >>>>> which
> >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
> >>>>> > coordinator
> >>>>> > > >>>>> can
> >>>>> > > >>>>> >> > leverage
> >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid
> code
> >>>>> > > >>>>> duplication.
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
> >>>>> > j...@ververica.com>
> >>>>> > > >>>>> wrote:
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
> >>>>> good! One
> >>>>> > > >>>>> small
> >>>>> > > >>>>> >> thing,
> >>>>> > > >>>>> >> > > you
> >>>>> > > >>>>> >> > > > > might want to write all content on the wiki page
> >>>>> instead
> >>>>> > > of
> >>>>> > > >>>>> >> linking
> >>>>> > > >>>>> >> > to
> >>>>> > > >>>>> >> > > a
> >>>>> > > >>>>> >> > > > > google doc. The reason is that some people might
> >>>>> not be
> >>>>> > > >>>>> able to
> >>>>> > > >>>>> >> > access
> >>>>> > > >>>>> >> > > > the
> >>>>> > > >>>>> >> > > > > google doc.
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > > > Best regards,
> >>>>> > > >>>>> >> > > > > Jing
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
> >>>>> > > yegang...@gmail.com
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >> wrote:
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > > >> Hi,
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >> We submit the Flip proposal
> >>>>> > > >>>>> >> > > > >> <
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >>
> >>>>> > > >>>>>
> >>>>> > >
> >>>>> >
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
> >>>>> > > >>>>> >> > > > >> >
> >>>>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext
> >>>>> from
> >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
> >>>>> > > >>>>> coordinators E.g.
> >>>>> > > >>>>> >> in
> >>>>> > > >>>>> >> > > the
> >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
> >>>>> > > >>>>> >> > > > >> <
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >>
> >>>>> > > >>>>>
> >>>>> > >
> >>>>> >
> >>>>>
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> >>>>> > > >>>>> >> > > > >> >
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >> Could you help to take a look?
> >>>>> > > >>>>> >> > > > >> Thanks
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >> Gang
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >
> >>>>> > > >>>>>
> >>>>> > > >>>>
> >>>>> > >
> >>>>> >
> >>>>>
> >>>>
>

Reply via email to