Hi Max and Qingsheng, Thanks for the feedback. The initial motivation to propose this is to reduce the duplicated code since ShuffleCoordinator would need similar communication logic as SourceCoordinator to talk with operators. I understand your concern that OperatorCoordinator is an internal class and except SourceCoordinator for now no other uses this. How about let's do it like what Qingsheng said? I can go ahead with the ShufflingCoordinator implementation without the extraction. Then we have intuitive sense of how many codes are copied and can be reused. If we feel that there is still a need to extract, we can revisit the discussion.
Thanks Gang On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org> wrote: > Thanks Gang and Steven for the FLIP. Actually I share the same concern > with Piotr and Maximilian. > > OperatorCoordinator is marked as @Internal intentionally considering some > existing issues, like consistency between non-source operator and > coordinator on checkpoint. I'm wondering if it is useful to expose a public > context to developers but have the OperatorCoordinator as an internal API. > If we finally close all issues and decide to expose the operator > coordinator API, it would be a better chance to include the base context as > a part of it. > > Best, > Qingsheng > > On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <m...@apache.org> wrote: > >> Thanks Steven! My confusion stemmed from the lack of context in the FLIP. >> The first version did not lay out how the refactoring would be used down >> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a >> non-public API and before reading the code, I wasn't even aware how >> exactly >> it worked and whether it would be available to regular operators (it was >> originally intended for sources only). >> >> I might seem pedantic here but I believe the purpose of a FLIP should be >> to >> describe the *why* behind the changes, not only the changes itself. A FLIP >> is not a formality but is a tool to communicate and discuss changes. I >> think we still haven't laid out the exact reasons why we are factoring out >> the base. As far as I understand now, we need the base class to deal with >> concurrent updates in the custom Coordinator from the runtime (sub)tasks. >> Effectively, we are enforcing an actor model for the processing of the >> incoming messages such that the OperatorCoordinator can cleanly update its >> state. However, if there are no actual implementations that make use of >> the >> refactoring in Flink itself, I wonder if it would make sense to copy this >> code to the downstream implementation, e.g. the ShuffleCoordinator. As >> soon >> as it is part of Flink, we could of course try to consolidate this code. >> >> Considering the *how* of this, there appear to be both methods from >> SourceCoordinator (e.g. runInEventLoop) as well as >> SourceCoordinatorContext >> listed in the FLIP, as well as methods which do not appear anywhere in >> Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It >> appears that some of this has been extracted from a downstream >> implementation. It would be great to adjust this, such that it reflects >> the >> status quo in Flink. >> >> -Max >> >> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com> wrote: >> >> > Max, >> > >> > Thanks a lot for the comments. We should clarify that the shuffle >> > operator/coordinator is not really part of the Flink sink >> > function/operator. shuffle operator is a custom operator that can be >> > inserted right before the Iceberg writer operator. Shuffle operator >> > calculates the traffic statistics and performs a custom >> partition/shuffle >> > (DataStream#partitionCustom) to cluster the data right before they get >> to >> > the Iceberg writer operator. >> > >> > We are not proposing to introduce a sink coordinator for the sink >> > interface. Shuffle operator needs the CoordinatorContextBase to >> > facilitate the communication btw shuffle subtasks and coordinator for >> > traffic statistics aggregation. The communication part is already >> > implemented by SourceCoordinatorContext. >> > >> > Here are some details about the communication needs. >> > - subtasks periodically calculate local statistics and send to the >> > coordinator for global aggregation >> > - the coordinator sends the globally aggregated statistics to the >> subtasks >> > - subtasks use the globally aggregated statistics to guide the >> > partition/shuffle decision >> > >> > Regards, >> > Steven >> > >> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <m...@apache.org> >> wrote: >> > >> > > Hi Gang, >> > > >> > > Looks much better! I've actually gone through the OperatorCoordinator >> > code. >> > > It turns out, any operator already has an OperatorCoordinator >> assigned. >> > > Also, any operator can add custom coordinator code. So it looks like >> you >> > > won't have to implement any additional runtime logic to add a >> > > ShuffleCoordinator. However, I'm wondering, why do you specifically >> need >> > to >> > > refactor the SourceCoordinatorContext? You could simply add your own >> > > coordinator code. I'm not sure the sink requirements map to the source >> > > interface so closely that you can reuse the same logic. >> > > >> > > If you can refactor SourceCoordinatorContext in a way that makes it >> fit >> > > your use case, I have nothing to object here. By the way, another >> example >> > > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator >> > which >> > > is quite trivial but it might be worth evaluating whether you need the >> > full >> > > power of SourceCoordinatorContext which is why I wanted to get more >> > > context. >> > > >> > > -Max >> > > >> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com> wrote: >> > > >> > > > Hi Max, >> > > > I got your concern. Since shuffling support for Flink Iceberg sink >> is >> > not >> > > > the main body of the proposal, I add another appendix part just now >> > with >> > > > more details about how to use CoordinatorContextBase and how to >> define >> > > > ShufflingCoordinator. >> > > > >> > > > Let me know if that cannot solve your concern. >> > > > >> > > > Thanks >> > > > Gang >> > > > >> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <m...@apache.org> >> > > wrote: >> > > > >> > > >> Hey Gang, >> > > >> >> > > >> What I'm looking for here is a complete picture of why the change >> is >> > > >> necessary and what the next steps are. Ultimately, refactoring any >> > code >> > > >> serves a purpose. Here, we want to refactor the Coordinator code >> such >> > > that >> > > >> we can add a SinkCoordinator, similar to the SourceCoordinator. The >> > FLIP >> > > >> should address the next steps, i.e. how you plan to add the >> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have >> to >> > be >> > > in >> > > >> great detail but without this information, I don't think the FLIP >> is >> > > >> complete. >> > > >> >> > > >> This feature should be generic enough to be usable by other sinks >> than >> > > >> the Iceberg sink. Of course Iceberg can still load its own >> > > implementation >> > > >> which may be outlined in a separate FLIP. >> > > >> >> > > >> Unless there is a good reason, normal operators should not support >> the >> > > >> coordinator functionality. At least I'm not convinced it would play >> > well >> > > >> with Flink's execution model. But I see how it is required for >> sources >> > > and >> > > >> sinks. >> > > >> >> > > >> -Max >> > > >> >> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegang...@gmail.com> >> wrote: >> > > >> >> > > >>> Hi Max, >> > > >>> >> > > >>> Thanks for reviewing. >> > > >>> >> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC >> calls >> > > >>> between the task and the job manager for communications and won't >> > touch >> > > >>> watermark checkpoint. >> > > >>> If the coordinator doesn't need RPC calls to talk with subtasks, >> then >> > > it >> > > >>> can define context without extending from the >> > CoordinatorContextBase(or >> > > >>> find another class name to limit the scope). >> > > >>> >> > > >>> Regarding the code-changing scope, for this Flip 264, we will >> only do >> > > >>> context extraction. The shuffling coordinator and operator >> > > >>> < >> > > >> > >> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo >> > > > >> > > >>> which will use the context will come with a separate proposal, >> thus >> > we >> > > try >> > > >>> to keep it simple in Flip 264 to understand. I can add a little >> bit >> > > more >> > > >>> about how to use the coordinator context in Flip 264 if you think >> > that >> > > will >> > > >>> be helpful. >> > > >>> >> > > >>> Thanks! >> > > >>> Gang >> > > >>> >> > > >>> >> > > >>> >> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels < >> m...@apache.org> >> > > >>> wrote: >> > > >>> >> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a >> bigger >> > > >>>> change. The coordinator for sources, as part of FLIP-27, was >> > > specifically >> > > >>>> added to synchronize the global watermark and to assign splits >> > > dynamically. >> > > >>>> However, it practically allows arbitrary RPC calls between the >> task >> > > and the >> > > >>>> job manager. I understand that there is concern that such a >> powerful >> > > >>>> mechanism should not be available to all operators. >> Nevertheless, I >> > > see the >> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest >> limiting >> > > this >> > > >>>> feature to sinks (and sources) only. >> > > >>>> >> > > >>>> I'm wondering whether extracting the SourceCoordinatorContext is >> > > >>>> enough to achieve what you want. There will be additional work >> > > necessary, >> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator >> which >> > > handles >> > > >>>> the RPC calls and the checkpointing. I think it would be good to >> > > outline >> > > >>>> this in the FLIP. >> > > >>>> >> > > >>>> -Max >> > > >>>> >> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <stevenz...@gmail.com> >> > > wrote: >> > > >>>> >> > > >>>>> sorry. sent the incomplete reply by mistake. >> > > >>>>> >> > > >>>>> If there are any concrete concerns, we can discuss. In the >> > > FLINK-27405 >> > > >>>>> [1], >> > > >>>>> Avid pointed out some implications regarding checkpointing. In >> this >> > > >>>>> small >> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we >> > mainly >> > > >>>>> need >> > > >>>>> the coordinator context functionality to facilitate the >> > communication >> > > >>>>> between coordinator and subtasks. >> > > >>>>> >> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405 >> > > >>>>> >> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <stevenz...@gmail.com >> > >> > > >>>>> wrote: >> > > >>>>> >> > > >>>>> > Hang, appreciate your input. Agree that >> `CoordinatorContextBase` >> > > is a >> > > >>>>> > better name considering Flink code convention. >> > > >>>>> > >> > > >>>>> > If there are any concrete concerns, we can discuss. In the >> jira, >> > > >>>>> > >> > > >>>>> > >> > > >>>>> > >> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan < >> > ruanhang1...@gmail.com >> > > > >> > > >>>>> wrote: >> > > >>>>> > >> > > >>>>> >> Hi, >> > > >>>>> >> >> > > >>>>> >> IMP, I agree to extract a base class for >> > SourceCoordinatorContext. >> > > >>>>> >> But I prefer to use the name >> `OperatorCoordinatorContextBase` or >> > > >>>>> >> `CoordinatorContextBase` as the format like >> `SourceReaderBase`. >> > > >>>>> >> I also agree to what Piotr said. Maybe more problems will >> occur >> > > when >> > > >>>>> >> connectors start to use it. >> > > >>>>> >> >> > > >>>>> >> Best, >> > > >>>>> >> Hang >> > > >>>>> >> >> > > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 22:31写道: >> > > >>>>> >> >> > > >>>>> >> > Piotr, >> > > >>>>> >> > >> > > >>>>> >> > The proposal is to extract the listed methods from >> @Iinternal >> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving >> > > >>>>> BaseCoordinatorContext. >> > > >>>>> >> > >> > > >>>>> >> > The motivation is that other operators can leverage the >> > > >>>>> communication >> > > >>>>> >> > mechanism btw operator coordinator and operator subtasks. >> For >> > > >>>>> example, >> > > >>>>> >> in >> > > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg >> sink) >> > > can >> > > >>>>> >> leverage >> > > >>>>> >> > it for computing traffic distribution statistics. >> > > >>>>> >> > * subtasks calculate local statistics and periodically send >> > them >> > > >>>>> to the >> > > >>>>> >> > coordinator for global aggregation. >> > > >>>>> >> > * The coordinator can broadcast the globally aggregated >> > > >>>>> statistics to >> > > >>>>> >> > subtasks, which can be used to guide the shuffling decision >> > > >>>>> (selecting >> > > >>>>> >> > downstream channels). >> > > >>>>> >> > >> > > >>>>> >> > Thanks, >> > > >>>>> >> > Steven >> > > >>>>> >> > >> > > >>>>> >> > >> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski < >> > > >>>>> pnowoj...@apache.org> >> > > >>>>> >> > wrote: >> > > >>>>> >> > >> > > >>>>> >> > > Hi, >> > > >>>>> >> > > >> > > >>>>> >> > > Could you clarify what's the proposal that you have in >> mind? >> > > >>>>> From the >> > > >>>>> >> > > context I would understand that the newly extracted >> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as >> > > >>>>> `@PublicEvolving` >> > > >>>>> >> or >> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and >> keeping >> > > >>>>> `@Internal` >> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class could >> have >> > > >>>>> been >> > > >>>>> >> removed >> > > >>>>> >> > > at any point of time in the future. Having said that, it >> > > sounds >> > > >>>>> to me >> > > >>>>> >> > like >> > > >>>>> >> > > your proposal is a bit bigger than it looks at the first >> > > glance >> > > >>>>> and >> > > >>>>> >> you >> > > >>>>> >> > > actually want to expose the operator coordinator concept >> to >> > > the >> > > >>>>> public >> > > >>>>> >> > API? >> > > >>>>> >> > > >> > > >>>>> >> > > AFAIK there were some discussions about that, and it was >> a >> > bit >> > > >>>>> of a >> > > >>>>> >> > > conscious decision to NOT do that. I don't know those >> > reasons >> > > >>>>> however. >> > > >>>>> >> > Only >> > > >>>>> >> > > now, I've just heard that there are for example some >> > problems >> > > >>>>> with >> > > >>>>> >> > > checkpointing of hypothetical non source operator >> > > coordinators. >> > > >>>>> Maybe >> > > >>>>> >> > > someone else could shed some light on this? >> > > >>>>> >> > > >> > > >>>>> >> > > Conceptually I would be actually in favour of exposing >> > > operator >> > > >>>>> >> > > coordinators if there is a good reason behind that, but >> it >> > is >> > > a >> > > >>>>> more >> > > >>>>> >> > > difficult topic and might be a larger effort than it >> seems >> > at >> > > >>>>> the >> > > >>>>> >> first >> > > >>>>> >> > > glance. >> > > >>>>> >> > > >> > > >>>>> >> > > Best, >> > > >>>>> >> > > Piotrek >> > > >>>>> >> > > >> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <stevenz...@gmail.com> >> > > >>>>> napisał(a): >> > > >>>>> >> > > >> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google >> doc >> > is >> > > >>>>> not for >> > > >>>>> >> > this >> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The >> > linked >> > > >>>>> google >> > > >>>>> >> doc >> > > >>>>> >> > > is >> > > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg >> > sink, >> > > >>>>> which >> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle >> > coordinator >> > > >>>>> can >> > > >>>>> >> > leverage >> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code >> > > >>>>> duplication. >> > > >>>>> >> > > > >> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge < >> > j...@ververica.com> >> > > >>>>> wrote: >> > > >>>>> >> > > > >> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall good! >> One >> > > >>>>> small >> > > >>>>> >> thing, >> > > >>>>> >> > > you >> > > >>>>> >> > > > > might want to write all content on the wiki page >> instead >> > > of >> > > >>>>> >> linking >> > > >>>>> >> > to >> > > >>>>> >> > > a >> > > >>>>> >> > > > > google doc. The reason is that some people might not >> be >> > > >>>>> able to >> > > >>>>> >> > access >> > > >>>>> >> > > > the >> > > >>>>> >> > > > > google doc. >> > > >>>>> >> > > > > >> > > >>>>> >> > > > > Best regards, >> > > >>>>> >> > > > > Jing >> > > >>>>> >> > > > > >> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye < >> > > yegang...@gmail.com >> > > >>>>> > >> > > >>>>> >> wrote: >> > > >>>>> >> > > > > >> > > >>>>> >> > > > >> Hi, >> > > >>>>> >> > > > >> >> > > >>>>> >> > > > >> We submit the Flip proposal >> > > >>>>> >> > > > >> < >> > > >>>>> >> > > > >> >> > > >>>>> >> > > > >> > > >>>>> >> > > >> > > >>>>> >> > >> > > >>>>> >> >> > > >>>>> >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext >> > > >>>>> >> > > > >> > >> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from >> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other >> > > >>>>> coordinators E.g. >> > > >>>>> >> in >> > > >>>>> >> > > the >> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink >> > > >>>>> >> > > > >> < >> > > >>>>> >> > > > >> >> > > >>>>> >> > > > >> > > >>>>> >> > > >> > > >>>>> >> > >> > > >>>>> >> >> > > >>>>> >> > > >> > >> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo >> > > >>>>> >> > > > >> > >> > > >>>>> >> > > > >> >> > > >>>>> >> > > > >> Could you help to take a look? >> > > >>>>> >> > > > >> Thanks >> > > >>>>> >> > > > >> >> > > >>>>> >> > > > >> Gang >> > > >>>>> >> > > > >> >> > > >>>>> >> > > > > >> > > >>>>> >> > > > >> > > >>>>> >> > > >> > > >>>>> >> > >> > > >>>>> >> >> > > >>>>> > >> > > >>>>> >> > > >>>> >> > > >> > >> >