Thanks Gang and Steven for the FLIP. Actually I share the same concern
with Piotr and Maximilian.

OperatorCoordinator is marked as @Internal intentionally considering some
existing issues, like consistency between non-source operator and
coordinator on checkpoint. I'm wondering if it is useful to expose a public
context to developers but have the OperatorCoordinator as an internal API.
If we finally close all issues and decide to expose the operator
coordinator API, it would be a better chance to include the base context as
a part of it.

Best,
Qingsheng

On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <m...@apache.org> wrote:

> Thanks Steven! My confusion stemmed from the lack of context in the FLIP.
> The first version did not lay out how the refactoring would be used down
> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a
> non-public API and before reading the code, I wasn't even aware how exactly
> it worked and whether it would be available to regular operators (it was
> originally intended for sources only).
>
> I might seem pedantic here but I believe the purpose of a FLIP should be to
> describe the *why* behind the changes, not only the changes itself. A FLIP
> is not a formality but is a tool to communicate and discuss changes. I
> think we still haven't laid out the exact reasons why we are factoring out
> the base. As far as I understand now, we need the base class to deal with
> concurrent updates in the custom Coordinator from the runtime (sub)tasks.
> Effectively, we are enforcing an actor model for the processing of the
> incoming messages such that the OperatorCoordinator can cleanly update its
> state. However, if there are no actual implementations that make use of the
> refactoring in Flink itself, I wonder if it would make sense to copy this
> code to the downstream implementation, e.g. the ShuffleCoordinator. As soon
> as it is part of Flink, we could of course try to consolidate this code.
>
> Considering the *how* of this, there appear to be both methods from
> SourceCoordinator (e.g. runInEventLoop) as well as SourceCoordinatorContext
> listed in the FLIP, as well as methods which do not appear anywhere in
> Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It
> appears that some of this has been extracted from a downstream
> implementation. It would be great to adjust this, such that it reflects the
> status quo in Flink.
>
> -Max
>
> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com> wrote:
>
> > Max,
> >
> > Thanks a lot for the comments. We should clarify that the shuffle
> > operator/coordinator is not really part of the Flink sink
> > function/operator. shuffle operator is a custom operator that can be
> > inserted right before the Iceberg writer operator. Shuffle operator
> > calculates the traffic statistics and performs a custom partition/shuffle
> > (DataStream#partitionCustom) to cluster the data right before they get to
> > the Iceberg writer operator.
> >
> > We are not proposing to introduce a sink coordinator for the sink
> > interface. Shuffle operator needs the CoordinatorContextBase to
> > facilitate the communication btw shuffle subtasks and coordinator for
> > traffic statistics aggregation. The communication part is already
> > implemented by SourceCoordinatorContext.
> >
> > Here are some details about the communication needs.
> > - subtasks periodically calculate local statistics and send to the
> > coordinator for global aggregation
> > - the coordinator sends the globally aggregated statistics to the
> subtasks
> > - subtasks use the globally aggregated statistics to guide the
> > partition/shuffle decision
> >
> > Regards,
> > Steven
> >
> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <m...@apache.org>
> wrote:
> >
> > > Hi Gang,
> > >
> > > Looks much better! I've actually gone through the OperatorCoordinator
> > code.
> > > It turns out, any operator already has an OperatorCoordinator assigned.
> > > Also, any operator can add custom coordinator code. So it looks like
> you
> > > won't have to implement any additional runtime logic to add a
> > > ShuffleCoordinator. However, I'm wondering, why do you specifically
> need
> > to
> > > refactor the SourceCoordinatorContext? You could simply add your own
> > > coordinator code. I'm not sure the sink requirements map to the source
> > > interface so closely that you can reuse the same logic.
> > >
> > > If you can refactor SourceCoordinatorContext in a way that makes it fit
> > > your use case, I have nothing to object here. By the way, another
> example
> > > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator
> > which
> > > is quite trivial but it might be worth evaluating whether you need the
> > full
> > > power of SourceCoordinatorContext which is why I wanted to get more
> > > context.
> > >
> > > -Max
> > >
> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com> wrote:
> > >
> > > > Hi Max,
> > > > I got your concern. Since shuffling support for Flink Iceberg sink is
> > not
> > > > the main body of the proposal, I add another appendix part just now
> > with
> > > > more details about how to use CoordinatorContextBase and how to
> define
> > > > ShufflingCoordinator.
> > > >
> > > > Let me know if that cannot solve your concern.
> > > >
> > > > Thanks
> > > > Gang
> > > >
> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <m...@apache.org>
> > > wrote:
> > > >
> > > >> Hey Gang,
> > > >>
> > > >> What I'm looking for here is a complete picture of why the change is
> > > >> necessary and what the next steps are. Ultimately, refactoring any
> > code
> > > >> serves a purpose. Here, we want to refactor the Coordinator code
> such
> > > that
> > > >> we can add a SinkCoordinator, similar to the SourceCoordinator. The
> > FLIP
> > > >> should address the next steps, i.e. how you plan to add the
> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have to
> > be
> > > in
> > > >> great detail but without this information, I don't think the FLIP is
> > > >> complete.
> > > >>
> > > >> This feature should be generic enough to be usable by other sinks
> than
> > > >> the Iceberg sink. Of course Iceberg can still load its own
> > > implementation
> > > >> which may be outlined in a separate FLIP.
> > > >>
> > > >> Unless there is a good reason, normal operators should not support
> the
> > > >> coordinator functionality. At least I'm not convinced it would play
> > well
> > > >> with Flink's execution model. But I see how it is required for
> sources
> > > and
> > > >> sinks.
> > > >>
> > > >> -Max
> > > >>
> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegang...@gmail.com>
> wrote:
> > > >>
> > > >>> Hi Max,
> > > >>>
> > > >>> Thanks for reviewing.
> > > >>>
> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC calls
> > > >>> between the task and the job manager for communications and won't
> > touch
> > > >>> watermark checkpoint.
> > > >>> If the coordinator doesn't need RPC calls to talk with subtasks,
> then
> > > it
> > > >>> can define context without extending from the
> > CoordinatorContextBase(or
> > > >>> find another class name to limit the scope).
> > > >>>
> > > >>> Regarding the code-changing scope, for this Flip 264, we will only
> do
> > > >>> context extraction. The shuffling coordinator and operator
> > > >>> <
> > >
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > > >
> > > >>> which will use the context will come with a separate proposal, thus
> > we
> > > try
> > > >>> to keep it simple in Flip 264 to understand. I can add a little bit
> > > more
> > > >>> about how to use the coordinator context in Flip 264 if you think
> > that
> > > will
> > > >>> be helpful.
> > > >>>
> > > >>> Thanks!
> > > >>> Gang
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <m...@apache.org
> >
> > > >>> wrote:
> > > >>>
> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a bigger
> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
> > > specifically
> > > >>>> added to synchronize the global watermark and to assign splits
> > > dynamically.
> > > >>>> However, it practically allows arbitrary RPC calls between the
> task
> > > and the
> > > >>>> job manager. I understand that there is concern that such a
> powerful
> > > >>>> mechanism should not be available to all operators. Nevertheless,
> I
> > > see the
> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
> limiting
> > > this
> > > >>>> feature to sinks (and sources) only.
> > > >>>>
> > > >>>> I'm wondering whether extracting the SourceCoordinatorContext is
> > > >>>> enough to achieve what you want. There will be additional work
> > > necessary,
> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator which
> > > handles
> > > >>>> the RPC calls and the checkpointing. I think it would be good to
> > > outline
> > > >>>> this in the FLIP.
> > > >>>>
> > > >>>> -Max
> > > >>>>
> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <stevenz...@gmail.com>
> > > wrote:
> > > >>>>
> > > >>>>> sorry. sent the incomplete reply by mistake.
> > > >>>>>
> > > >>>>> If there are any concrete concerns, we can discuss. In the
> > > FLINK-27405
> > > >>>>> [1],
> > > >>>>> Avid pointed out some implications regarding checkpointing. In
> this
> > > >>>>> small
> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we
> > mainly
> > > >>>>> need
> > > >>>>> the coordinator context functionality to facilitate the
> > communication
> > > >>>>> between coordinator and subtasks.
> > > >>>>>
> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
> > > >>>>>
> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <stevenz...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>> > Hang, appreciate your input. Agree that
> `CoordinatorContextBase`
> > > is a
> > > >>>>> > better name considering Flink code convention.
> > > >>>>> >
> > > >>>>> > If there are any concrete concerns, we can discuss. In the
> jira,
> > > >>>>> >
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
> > ruanhang1...@gmail.com
> > > >
> > > >>>>> wrote:
> > > >>>>> >
> > > >>>>> >> Hi,
> > > >>>>> >>
> > > >>>>> >> IMP, I agree to extract a base class for
> > SourceCoordinatorContext.
> > > >>>>> >> But I prefer to use the name `OperatorCoordinatorContextBase`
> or
> > > >>>>> >> `CoordinatorContextBase` as the format like
> `SourceReaderBase`.
> > > >>>>> >> I also agree to what Piotr said. Maybe more problems will
> occur
> > > when
> > > >>>>> >> connectors start to use it.
> > > >>>>> >>
> > > >>>>> >> Best,
> > > >>>>> >> Hang
> > > >>>>> >>
> > > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 22:31写道:
> > > >>>>> >>
> > > >>>>> >> > Piotr,
> > > >>>>> >> >
> > > >>>>> >> > The proposal is to extract the listed methods from
> @Iinternal
> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
> > > >>>>> BaseCoordinatorContext.
> > > >>>>> >> >
> > > >>>>> >> > The motivation is that other operators can leverage the
> > > >>>>> communication
> > > >>>>> >> > mechanism btw operator coordinator and operator subtasks.
> For
> > > >>>>> example,
> > > >>>>> >> in
> > > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg
> sink)
> > > can
> > > >>>>> >> leverage
> > > >>>>> >> > it for computing traffic distribution statistics.
> > > >>>>> >> > * subtasks calculate local statistics and periodically send
> > them
> > > >>>>> to the
> > > >>>>> >> > coordinator for global aggregation.
> > > >>>>> >> > * The coordinator can broadcast the globally aggregated
> > > >>>>> statistics to
> > > >>>>> >> > subtasks, which can be used to guide the shuffling decision
> > > >>>>> (selecting
> > > >>>>> >> > downstream channels).
> > > >>>>> >> >
> > > >>>>> >> > Thanks,
> > > >>>>> >> > Steven
> > > >>>>> >> >
> > > >>>>> >> >
> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
> > > >>>>> pnowoj...@apache.org>
> > > >>>>> >> > wrote:
> > > >>>>> >> >
> > > >>>>> >> > > Hi,
> > > >>>>> >> > >
> > > >>>>> >> > > Could you clarify what's the proposal that you have in
> mind?
> > > >>>>> From the
> > > >>>>> >> > > context I would understand that the newly extracted
> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
> > > >>>>> `@PublicEvolving`
> > > >>>>> >> or
> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and keeping
> > > >>>>> `@Internal`
> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class could
> have
> > > >>>>> been
> > > >>>>> >> removed
> > > >>>>> >> > > at any point of time in the future. Having said that, it
> > > sounds
> > > >>>>> to me
> > > >>>>> >> > like
> > > >>>>> >> > > your proposal is a bit bigger than it looks at the first
> > > glance
> > > >>>>> and
> > > >>>>> >> you
> > > >>>>> >> > > actually want to expose the operator coordinator concept
> to
> > > the
> > > >>>>> public
> > > >>>>> >> > API?
> > > >>>>> >> > >
> > > >>>>> >> > > AFAIK there were some discussions about that, and it was a
> > bit
> > > >>>>> of a
> > > >>>>> >> > > conscious decision to NOT do that. I don't know those
> > reasons
> > > >>>>> however.
> > > >>>>> >> > Only
> > > >>>>> >> > > now, I've just heard that there are for example some
> > problems
> > > >>>>> with
> > > >>>>> >> > > checkpointing of hypothetical non source operator
> > > coordinators.
> > > >>>>> Maybe
> > > >>>>> >> > > someone else could shed some light on this?
> > > >>>>> >> > >
> > > >>>>> >> > > Conceptually I would be actually in favour of exposing
> > > operator
> > > >>>>> >> > > coordinators if there is a good reason behind that, but it
> > is
> > > a
> > > >>>>> more
> > > >>>>> >> > > difficult topic and might be a larger effort than it seems
> > at
> > > >>>>> the
> > > >>>>> >> first
> > > >>>>> >> > > glance.
> > > >>>>> >> > >
> > > >>>>> >> > > Best,
> > > >>>>> >> > > Piotrek
> > > >>>>> >> > >
> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <stevenz...@gmail.com>
> > > >>>>> napisał(a):
> > > >>>>> >> > >
> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google doc
> > is
> > > >>>>> not for
> > > >>>>> >> > this
> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The
> > linked
> > > >>>>> google
> > > >>>>> >> doc
> > > >>>>> >> > > is
> > > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg
> > sink,
> > > >>>>> which
> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
> > coordinator
> > > >>>>> can
> > > >>>>> >> > leverage
> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
> > > >>>>> duplication.
> > > >>>>> >> > > >
> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
> > j...@ververica.com>
> > > >>>>> wrote:
> > > >>>>> >> > > >
> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall good!
> One
> > > >>>>> small
> > > >>>>> >> thing,
> > > >>>>> >> > > you
> > > >>>>> >> > > > > might want to write all content on the wiki page
> instead
> > > of
> > > >>>>> >> linking
> > > >>>>> >> > to
> > > >>>>> >> > > a
> > > >>>>> >> > > > > google doc. The reason is that some people might not
> be
> > > >>>>> able to
> > > >>>>> >> > access
> > > >>>>> >> > > > the
> > > >>>>> >> > > > > google doc.
> > > >>>>> >> > > > >
> > > >>>>> >> > > > > Best regards,
> > > >>>>> >> > > > > Jing
> > > >>>>> >> > > > >
> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
> > > yegang...@gmail.com
> > > >>>>> >
> > > >>>>> >> wrote:
> > > >>>>> >> > > > >
> > > >>>>> >> > > > >> Hi,
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >> We submit the Flip proposal
> > > >>>>> >> > > > >> <
> > > >>>>> >> > > > >>
> > > >>>>> >> > > >
> > > >>>>> >> > >
> > > >>>>> >> >
> > > >>>>> >>
> > > >>>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
> > > >>>>> >> > > > >> >
> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
> > > >>>>> coordinators E.g.
> > > >>>>> >> in
> > > >>>>> >> > > the
> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
> > > >>>>> >> > > > >> <
> > > >>>>> >> > > > >>
> > > >>>>> >> > > >
> > > >>>>> >> > >
> > > >>>>> >> >
> > > >>>>> >>
> > > >>>>>
> > >
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > > >>>>> >> > > > >> >
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >> Could you help to take a look?
> > > >>>>> >> > > > >> Thanks
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >> Gang
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >
> > > >>>>> >> > > >
> > > >>>>> >> > >
> > > >>>>> >> >
> > > >>>>> >>
> > > >>>>> >
> > > >>>>>
> > > >>>>
> > >
> >
>

Reply via email to