Thanks Gang and Steven for the FLIP. Actually I share the same concern with Piotr and Maximilian.
OperatorCoordinator is marked as @Internal intentionally considering some existing issues, like consistency between non-source operator and coordinator on checkpoint. I'm wondering if it is useful to expose a public context to developers but have the OperatorCoordinator as an internal API. If we finally close all issues and decide to expose the operator coordinator API, it would be a better chance to include the base context as a part of it. Best, Qingsheng On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <m...@apache.org> wrote: > Thanks Steven! My confusion stemmed from the lack of context in the FLIP. > The first version did not lay out how the refactoring would be used down > the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a > non-public API and before reading the code, I wasn't even aware how exactly > it worked and whether it would be available to regular operators (it was > originally intended for sources only). > > I might seem pedantic here but I believe the purpose of a FLIP should be to > describe the *why* behind the changes, not only the changes itself. A FLIP > is not a formality but is a tool to communicate and discuss changes. I > think we still haven't laid out the exact reasons why we are factoring out > the base. As far as I understand now, we need the base class to deal with > concurrent updates in the custom Coordinator from the runtime (sub)tasks. > Effectively, we are enforcing an actor model for the processing of the > incoming messages such that the OperatorCoordinator can cleanly update its > state. However, if there are no actual implementations that make use of the > refactoring in Flink itself, I wonder if it would make sense to copy this > code to the downstream implementation, e.g. the ShuffleCoordinator. As soon > as it is part of Flink, we could of course try to consolidate this code. > > Considering the *how* of this, there appear to be both methods from > SourceCoordinator (e.g. runInEventLoop) as well as SourceCoordinatorContext > listed in the FLIP, as well as methods which do not appear anywhere in > Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It > appears that some of this has been extracted from a downstream > implementation. It would be great to adjust this, such that it reflects the > status quo in Flink. > > -Max > > On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com> wrote: > > > Max, > > > > Thanks a lot for the comments. We should clarify that the shuffle > > operator/coordinator is not really part of the Flink sink > > function/operator. shuffle operator is a custom operator that can be > > inserted right before the Iceberg writer operator. Shuffle operator > > calculates the traffic statistics and performs a custom partition/shuffle > > (DataStream#partitionCustom) to cluster the data right before they get to > > the Iceberg writer operator. > > > > We are not proposing to introduce a sink coordinator for the sink > > interface. Shuffle operator needs the CoordinatorContextBase to > > facilitate the communication btw shuffle subtasks and coordinator for > > traffic statistics aggregation. The communication part is already > > implemented by SourceCoordinatorContext. > > > > Here are some details about the communication needs. > > - subtasks periodically calculate local statistics and send to the > > coordinator for global aggregation > > - the coordinator sends the globally aggregated statistics to the > subtasks > > - subtasks use the globally aggregated statistics to guide the > > partition/shuffle decision > > > > Regards, > > Steven > > > > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <m...@apache.org> > wrote: > > > > > Hi Gang, > > > > > > Looks much better! I've actually gone through the OperatorCoordinator > > code. > > > It turns out, any operator already has an OperatorCoordinator assigned. > > > Also, any operator can add custom coordinator code. So it looks like > you > > > won't have to implement any additional runtime logic to add a > > > ShuffleCoordinator. However, I'm wondering, why do you specifically > need > > to > > > refactor the SourceCoordinatorContext? You could simply add your own > > > coordinator code. I'm not sure the sink requirements map to the source > > > interface so closely that you can reuse the same logic. > > > > > > If you can refactor SourceCoordinatorContext in a way that makes it fit > > > your use case, I have nothing to object here. By the way, another > example > > > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator > > which > > > is quite trivial but it might be worth evaluating whether you need the > > full > > > power of SourceCoordinatorContext which is why I wanted to get more > > > context. > > > > > > -Max > > > > > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com> wrote: > > > > > > > Hi Max, > > > > I got your concern. Since shuffling support for Flink Iceberg sink is > > not > > > > the main body of the proposal, I add another appendix part just now > > with > > > > more details about how to use CoordinatorContextBase and how to > define > > > > ShufflingCoordinator. > > > > > > > > Let me know if that cannot solve your concern. > > > > > > > > Thanks > > > > Gang > > > > > > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <m...@apache.org> > > > wrote: > > > > > > > >> Hey Gang, > > > >> > > > >> What I'm looking for here is a complete picture of why the change is > > > >> necessary and what the next steps are. Ultimately, refactoring any > > code > > > >> serves a purpose. Here, we want to refactor the Coordinator code > such > > > that > > > >> we can add a SinkCoordinator, similar to the SourceCoordinator. The > > FLIP > > > >> should address the next steps, i.e. how you plan to add the > > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have to > > be > > > in > > > >> great detail but without this information, I don't think the FLIP is > > > >> complete. > > > >> > > > >> This feature should be generic enough to be usable by other sinks > than > > > >> the Iceberg sink. Of course Iceberg can still load its own > > > implementation > > > >> which may be outlined in a separate FLIP. > > > >> > > > >> Unless there is a good reason, normal operators should not support > the > > > >> coordinator functionality. At least I'm not convinced it would play > > well > > > >> with Flink's execution model. But I see how it is required for > sources > > > and > > > >> sinks. > > > >> > > > >> -Max > > > >> > > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegang...@gmail.com> > wrote: > > > >> > > > >>> Hi Max, > > > >>> > > > >>> Thanks for reviewing. > > > >>> > > > >>> For this Flip 264, yes, we will only focus on abstracting RPC calls > > > >>> between the task and the job manager for communications and won't > > touch > > > >>> watermark checkpoint. > > > >>> If the coordinator doesn't need RPC calls to talk with subtasks, > then > > > it > > > >>> can define context without extending from the > > CoordinatorContextBase(or > > > >>> find another class name to limit the scope). > > > >>> > > > >>> Regarding the code-changing scope, for this Flip 264, we will only > do > > > >>> context extraction. The shuffling coordinator and operator > > > >>> < > > > > > > https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo > > > > > > > >>> which will use the context will come with a separate proposal, thus > > we > > > try > > > >>> to keep it simple in Flip 264 to understand. I can add a little bit > > > more > > > >>> about how to use the coordinator context in Flip 264 if you think > > that > > > will > > > >>> be helpful. > > > >>> > > > >>> Thanks! > > > >>> Gang > > > >>> > > > >>> > > > >>> > > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <m...@apache.org > > > > > >>> wrote: > > > >>> > > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a bigger > > > >>>> change. The coordinator for sources, as part of FLIP-27, was > > > specifically > > > >>>> added to synchronize the global watermark and to assign splits > > > dynamically. > > > >>>> However, it practically allows arbitrary RPC calls between the > task > > > and the > > > >>>> job manager. I understand that there is concern that such a > powerful > > > >>>> mechanism should not be available to all operators. Nevertheless, > I > > > see the > > > >>>> practical use in case of sinks like Iceberg. So I'd suggest > limiting > > > this > > > >>>> feature to sinks (and sources) only. > > > >>>> > > > >>>> I'm wondering whether extracting the SourceCoordinatorContext is > > > >>>> enough to achieve what you want. There will be additional work > > > necessary, > > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator which > > > handles > > > >>>> the RPC calls and the checkpointing. I think it would be good to > > > outline > > > >>>> this in the FLIP. > > > >>>> > > > >>>> -Max > > > >>>> > > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <stevenz...@gmail.com> > > > wrote: > > > >>>> > > > >>>>> sorry. sent the incomplete reply by mistake. > > > >>>>> > > > >>>>> If there are any concrete concerns, we can discuss. In the > > > FLINK-27405 > > > >>>>> [1], > > > >>>>> Avid pointed out some implications regarding checkpointing. In > this > > > >>>>> small > > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we > > mainly > > > >>>>> need > > > >>>>> the coordinator context functionality to facilitate the > > communication > > > >>>>> between coordinator and subtasks. > > > >>>>> > > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405 > > > >>>>> > > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <stevenz...@gmail.com> > > > >>>>> wrote: > > > >>>>> > > > >>>>> > Hang, appreciate your input. Agree that > `CoordinatorContextBase` > > > is a > > > >>>>> > better name considering Flink code convention. > > > >>>>> > > > > >>>>> > If there are any concrete concerns, we can discuss. In the > jira, > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan < > > ruanhang1...@gmail.com > > > > > > > >>>>> wrote: > > > >>>>> > > > > >>>>> >> Hi, > > > >>>>> >> > > > >>>>> >> IMP, I agree to extract a base class for > > SourceCoordinatorContext. > > > >>>>> >> But I prefer to use the name `OperatorCoordinatorContextBase` > or > > > >>>>> >> `CoordinatorContextBase` as the format like > `SourceReaderBase`. > > > >>>>> >> I also agree to what Piotr said. Maybe more problems will > occur > > > when > > > >>>>> >> connectors start to use it. > > > >>>>> >> > > > >>>>> >> Best, > > > >>>>> >> Hang > > > >>>>> >> > > > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 22:31写道: > > > >>>>> >> > > > >>>>> >> > Piotr, > > > >>>>> >> > > > > >>>>> >> > The proposal is to extract the listed methods from > @Iinternal > > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving > > > >>>>> BaseCoordinatorContext. > > > >>>>> >> > > > > >>>>> >> > The motivation is that other operators can leverage the > > > >>>>> communication > > > >>>>> >> > mechanism btw operator coordinator and operator subtasks. > For > > > >>>>> example, > > > >>>>> >> in > > > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg > sink) > > > can > > > >>>>> >> leverage > > > >>>>> >> > it for computing traffic distribution statistics. > > > >>>>> >> > * subtasks calculate local statistics and periodically send > > them > > > >>>>> to the > > > >>>>> >> > coordinator for global aggregation. > > > >>>>> >> > * The coordinator can broadcast the globally aggregated > > > >>>>> statistics to > > > >>>>> >> > subtasks, which can be used to guide the shuffling decision > > > >>>>> (selecting > > > >>>>> >> > downstream channels). > > > >>>>> >> > > > > >>>>> >> > Thanks, > > > >>>>> >> > Steven > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski < > > > >>>>> pnowoj...@apache.org> > > > >>>>> >> > wrote: > > > >>>>> >> > > > > >>>>> >> > > Hi, > > > >>>>> >> > > > > > >>>>> >> > > Could you clarify what's the proposal that you have in > mind? > > > >>>>> From the > > > >>>>> >> > > context I would understand that the newly extracted > > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as > > > >>>>> `@PublicEvolving` > > > >>>>> >> or > > > >>>>> >> > > `@Experimental`, since otherwise extracting it and keeping > > > >>>>> `@Internal` > > > >>>>> >> > > wouldn't change much? Such `@Internal` base class could > have > > > >>>>> been > > > >>>>> >> removed > > > >>>>> >> > > at any point of time in the future. Having said that, it > > > sounds > > > >>>>> to me > > > >>>>> >> > like > > > >>>>> >> > > your proposal is a bit bigger than it looks at the first > > > glance > > > >>>>> and > > > >>>>> >> you > > > >>>>> >> > > actually want to expose the operator coordinator concept > to > > > the > > > >>>>> public > > > >>>>> >> > API? > > > >>>>> >> > > > > > >>>>> >> > > AFAIK there were some discussions about that, and it was a > > bit > > > >>>>> of a > > > >>>>> >> > > conscious decision to NOT do that. I don't know those > > reasons > > > >>>>> however. > > > >>>>> >> > Only > > > >>>>> >> > > now, I've just heard that there are for example some > > problems > > > >>>>> with > > > >>>>> >> > > checkpointing of hypothetical non source operator > > > coordinators. > > > >>>>> Maybe > > > >>>>> >> > > someone else could shed some light on this? > > > >>>>> >> > > > > > >>>>> >> > > Conceptually I would be actually in favour of exposing > > > operator > > > >>>>> >> > > coordinators if there is a good reason behind that, but it > > is > > > a > > > >>>>> more > > > >>>>> >> > > difficult topic and might be a larger effort than it seems > > at > > > >>>>> the > > > >>>>> >> first > > > >>>>> >> > > glance. > > > >>>>> >> > > > > > >>>>> >> > > Best, > > > >>>>> >> > > Piotrek > > > >>>>> >> > > > > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <stevenz...@gmail.com> > > > >>>>> napisał(a): > > > >>>>> >> > > > > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google doc > > is > > > >>>>> not for > > > >>>>> >> > this > > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The > > linked > > > >>>>> google > > > >>>>> >> doc > > > >>>>> >> > > is > > > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg > > sink, > > > >>>>> which > > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle > > coordinator > > > >>>>> can > > > >>>>> >> > leverage > > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code > > > >>>>> duplication. > > > >>>>> >> > > > > > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge < > > j...@ververica.com> > > > >>>>> wrote: > > > >>>>> >> > > > > > > >>>>> >> > > > > Thanks for bringing this up. It looks overall good! > One > > > >>>>> small > > > >>>>> >> thing, > > > >>>>> >> > > you > > > >>>>> >> > > > > might want to write all content on the wiki page > instead > > > of > > > >>>>> >> linking > > > >>>>> >> > to > > > >>>>> >> > > a > > > >>>>> >> > > > > google doc. The reason is that some people might not > be > > > >>>>> able to > > > >>>>> >> > access > > > >>>>> >> > > > the > > > >>>>> >> > > > > google doc. > > > >>>>> >> > > > > > > > >>>>> >> > > > > Best regards, > > > >>>>> >> > > > > Jing > > > >>>>> >> > > > > > > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye < > > > yegang...@gmail.com > > > >>>>> > > > > >>>>> >> wrote: > > > >>>>> >> > > > > > > > >>>>> >> > > > >> Hi, > > > >>>>> >> > > > >> > > > >>>>> >> > > > >> We submit the Flip proposal > > > >>>>> >> > > > >> < > > > >>>>> >> > > > >> > > > >>>>> >> > > > > > > >>>>> >> > > > > > >>>>> >> > > > > >>>>> >> > > > >>>>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext > > > >>>>> >> > > > >> > > > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from > > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other > > > >>>>> coordinators E.g. > > > >>>>> >> in > > > >>>>> >> > > the > > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink > > > >>>>> >> > > > >> < > > > >>>>> >> > > > >> > > > >>>>> >> > > > > > > >>>>> >> > > > > > >>>>> >> > > > > >>>>> >> > > > >>>>> > > > > > > https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo > > > >>>>> >> > > > >> > > > > >>>>> >> > > > >> > > > >>>>> >> > > > >> Could you help to take a look? > > > >>>>> >> > > > >> Thanks > > > >>>>> >> > > > >> > > > >>>>> >> > > > >> Gang > > > >>>>> >> > > > >> > > > >>>>> >> > > > > > > > >>>>> >> > > > > > > >>>>> >> > > > > > >>>>> >> > > > > >>>>> >> > > > >>>>> > > > > >>>>> > > > >>>> > > > > > >