What's the status and conclusion of this discussion? I have seen the value of exposing OperatorCoordinator because of the powerful RPC calls, some projects are already using it, such as Hudi[1]. But I agree this is a large topic and requires another FLIP.
I am also concerned about extracting a Public base class without implementations, and clear usage is easy to break in the future. However, I think the shuffling operator can be a generic component used by other connectors and DataStream jobs. Have you considered contributing the ShuffleOperator to the Flink main repository as a part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to extract the common part between SourceCoordinatorContext and ShuffleCoordinatorContext in a single repository as an internal implementation. Best, Jark [1]: https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pnowoj...@apache.org> wrote: > Ohhh, I was confused. I thought that the proposal is to make > `CoordinatorContextBase` part of the public API. > > However, I'm also against extracting `CoordinatorContextBase` as an > `@Internal` class as well. > > 1. Connectors shouldn't reuse internal classes. Using `@Internal` > CoordinatedOperatorFactory would be already quite bad, but at least this is > a relatively stable internal API. Using `@Internal` > `@CoordinatorContextBase`, and refactoring out this base class just for the > sake of re-using it in a connector is IMO even worse. > 2. Double so if they are in a separate repository (as the iceberg connector > will be/is, right?). There would be no way to prevent breaking changes > between repositories. > > If that's only intended as the stop-gap solution until we properly expose > coordinators, the lesser evil would be IMO to copy/paste/modify > SourceCoordinatorContext to the flink-connector-iceberg repository. > > Best, > Piotrek > > czw., 3 lis 2022 o 12:51 Maximilian Michels <m...@apache.org> napisał(a): > > > +1 If we wanted to expose the OperatorCoordinator API, we should provide > > an adequate interface. The FLIP partially addresses this by trying to > > factor out RPC code which other coordinators might make use of, but there > > is additional design necessary to realize a public operator API. > > > > Just to be clear, I'm not opposed to any of the changes in the FLIP. I > > think they make sense in the context of an Iceberg ShuffleCoordinator in > > Flink. If we were to add such a new coordinator, feel free to make the > > proposed code refactoring as part of a pull request. A FLIP isn't > strictly > > necessary here because this is a purely internal change which does not > > alter public APIs, nor does it alter the internal architecture, apart > from > > reusing a bit of existing code. I'm sorry if we consumed some of your > time > > revising the document but I think we had a healthy discussion here. And > > we're definitely looking forward to seeing some of these code changes! > > > > -Max > > > > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > >> Hi, > >> > >> Sorry for the delay, but I've given more thoughts into this. First I > >> share the same thoughts as Maximilian, that this FLIP is incomplete. As > I > >> understand it, you are trying to hack existing code to expose small > bits of > >> internal functionalities as part of the public API without solving many > of > >> the underlying issues. > >> > >> For example, what's the point of exposing `CoordinatorContextBase` as a > >> public API if users can not use it? After all, the `OperatorCoordinator` > >> and `CoordinatedOperatorFactory` would remain internal. At the same > time, > >> this FLIP would officially force us to support and maintain this > >> CoordinatorContextBase, while I have strong feelings that we don't want > to > >> do that in the long term. I think we would need to take a big step back > and > >> first discuss how we would like to expose the coordinators and agree > how to > >> deal with the issues. > >> > >> First big issue that I see is that I would feel very worried exposing > >> coordinator API without at least designing/planning how to deal with > >> checkpointing their state. Without that, I'm afraid we might end up in a > >> situation where we need to break the API in order to properly support > >> stateful coordinators. And at the moment I don't see a good and easy > >> solution to this problem. > >> > >> Second issue is the shape of the exposed public API. Exposing > >> `OperatorCoordinator` or `CoordinatorContextBase` looks to me like a > bad > >> design, that would expose way too many things to the users, making > future > >> development more complicated for us and making implementation of those > >> interfaces by the user unnecessary difficult. I see this as a similar > issue > >> as the low level `StreamOperator` API vs the higher level > >> `org.apache.flink.api.common.functions.Function` API. (instead of > exposing > >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up the > >> `ProcessFunction` to expose all of the remaining functionalities in a > >> user-friendly way). In the context of the coordinators, I would say > that we > >> should expose as the public API not the `OperatorCoordinator`, but for > >> example some kind of an `EventProcessFunction` that would have a simple > >> interface like: > >> ``` > >> interface EventProcessFunction { > >> void processEvent(int subtask, OperatorEvent event, EventDispatcher > >> eventDispatcher); > >> } > >> ``` > >> + maybe some features like processing time timers/mailbox style async > >> actions. > >> (or maybe that could have been just a regular `ProcessFunction` but with > >> `OperatorEvent` with `int subtask` as input/output). > >> > >> Best, > >> Piotrek > >> > >> śr., 2 lis 2022 o 19:40 gang ye <yegang...@gmail.com> napisał(a): > >> > >>> Hi Max and Qingsheng, > >>> > >>> Thanks for the feedback. The initial motivation to propose this is to > >>> reduce the duplicated code since ShuffleCoordinator would need similar > >>> communication logic as SourceCoordinator to talk with operators. I > >>> understand your concern that OperatorCoordinator is an internal class > and > >>> except SourceCoordinator for now no other uses this. > >>> How about let's do it like what Qingsheng said? I can go ahead with the > >>> ShufflingCoordinator implementation without the extraction. Then we > have > >>> intuitive sense of how many codes are copied and can be reused. If we > feel > >>> that there is still a need to extract, we can revisit the discussion. > >>> > >>> Thanks > >>> Gang > >>> > >>> > >>> > >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org> > wrote: > >>> > >>>> Thanks Gang and Steven for the FLIP. Actually I share the same concern > >>>> with Piotr and Maximilian. > >>>> > >>>> OperatorCoordinator is marked as @Internal intentionally considering > >>>> some existing issues, like consistency between non-source operator and > >>>> coordinator on checkpoint. I'm wondering if it is useful to expose a > public > >>>> context to developers but have the OperatorCoordinator as an internal > API. > >>>> If we finally close all issues and decide to expose the operator > >>>> coordinator API, it would be a better chance to include the base > context as > >>>> a part of it. > >>>> > >>>> Best, > >>>> Qingsheng > >>>> > >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <m...@apache.org> > >>>> wrote: > >>>> > >>>>> Thanks Steven! My confusion stemmed from the lack of context in the > >>>>> FLIP. > >>>>> The first version did not lay out how the refactoring would be used > >>>>> down > >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API > >>>>> is a > >>>>> non-public API and before reading the code, I wasn't even aware how > >>>>> exactly > >>>>> it worked and whether it would be available to regular operators (it > >>>>> was > >>>>> originally intended for sources only). > >>>>> > >>>>> I might seem pedantic here but I believe the purpose of a FLIP should > >>>>> be to > >>>>> describe the *why* behind the changes, not only the changes itself. A > >>>>> FLIP > >>>>> is not a formality but is a tool to communicate and discuss changes. > I > >>>>> think we still haven't laid out the exact reasons why we are > factoring > >>>>> out > >>>>> the base. As far as I understand now, we need the base class to deal > >>>>> with > >>>>> concurrent updates in the custom Coordinator from the runtime > >>>>> (sub)tasks. > >>>>> Effectively, we are enforcing an actor model for the processing of > the > >>>>> incoming messages such that the OperatorCoordinator can cleanly > update > >>>>> its > >>>>> state. However, if there are no actual implementations that make use > >>>>> of the > >>>>> refactoring in Flink itself, I wonder if it would make sense to copy > >>>>> this > >>>>> code to the downstream implementation, e.g. the ShuffleCoordinator. > As > >>>>> soon > >>>>> as it is part of Flink, we could of course try to consolidate this > >>>>> code. > >>>>> > >>>>> Considering the *how* of this, there appear to be both methods from > >>>>> SourceCoordinator (e.g. runInEventLoop) as well as > >>>>> SourceCoordinatorContext > >>>>> listed in the FLIP, as well as methods which do not appear anywhere > in > >>>>> Flink code, e.g. subTaskReady / subTaskNotReady / > sendEventToOperator. > >>>>> It > >>>>> appears that some of this has been extracted from a downstream > >>>>> implementation. It would be great to adjust this, such that it > >>>>> reflects the > >>>>> status quo in Flink. > >>>>> > >>>>> -Max > >>>>> > >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <stevenz...@gmail.com> > >>>>> wrote: > >>>>> > >>>>> > Max, > >>>>> > > >>>>> > Thanks a lot for the comments. We should clarify that the shuffle > >>>>> > operator/coordinator is not really part of the Flink sink > >>>>> > function/operator. shuffle operator is a custom operator that can > be > >>>>> > inserted right before the Iceberg writer operator. Shuffle operator > >>>>> > calculates the traffic statistics and performs a custom > >>>>> partition/shuffle > >>>>> > (DataStream#partitionCustom) to cluster the data right before they > >>>>> get to > >>>>> > the Iceberg writer operator. > >>>>> > > >>>>> > We are not proposing to introduce a sink coordinator for the sink > >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to > >>>>> > facilitate the communication btw shuffle subtasks and coordinator > for > >>>>> > traffic statistics aggregation. The communication part is already > >>>>> > implemented by SourceCoordinatorContext. > >>>>> > > >>>>> > Here are some details about the communication needs. > >>>>> > - subtasks periodically calculate local statistics and send to the > >>>>> > coordinator for global aggregation > >>>>> > - the coordinator sends the globally aggregated statistics to the > >>>>> subtasks > >>>>> > - subtasks use the globally aggregated statistics to guide the > >>>>> > partition/shuffle decision > >>>>> > > >>>>> > Regards, > >>>>> > Steven > >>>>> > > >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <m...@apache.org > > > >>>>> wrote: > >>>>> > > >>>>> > > Hi Gang, > >>>>> > > > >>>>> > > Looks much better! I've actually gone through the > >>>>> OperatorCoordinator > >>>>> > code. > >>>>> > > It turns out, any operator already has an OperatorCoordinator > >>>>> assigned. > >>>>> > > Also, any operator can add custom coordinator code. So it looks > >>>>> like you > >>>>> > > won't have to implement any additional runtime logic to add a > >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you > >>>>> specifically need > >>>>> > to > >>>>> > > refactor the SourceCoordinatorContext? You could simply add your > >>>>> own > >>>>> > > coordinator code. I'm not sure the sink requirements map to the > >>>>> source > >>>>> > > interface so closely that you can reuse the same logic. > >>>>> > > > >>>>> > > If you can refactor SourceCoordinatorContext in a way that makes > >>>>> it fit > >>>>> > > your use case, I have nothing to object here. By the way, another > >>>>> example > >>>>> > > of an existing OperatorCoordinator is > >>>>> CollectSinkOperatorCoordinator > >>>>> > which > >>>>> > > is quite trivial but it might be worth evaluating whether you > need > >>>>> the > >>>>> > full > >>>>> > > power of SourceCoordinatorContext which is why I wanted to get > more > >>>>> > > context. > >>>>> > > > >>>>> > > -Max > >>>>> > > > >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <yegang...@gmail.com> > >>>>> wrote: > >>>>> > > > >>>>> > > > Hi Max, > >>>>> > > > I got your concern. Since shuffling support for Flink Iceberg > >>>>> sink is > >>>>> > not > >>>>> > > > the main body of the proposal, I add another appendix part just > >>>>> now > >>>>> > with > >>>>> > > > more details about how to use CoordinatorContextBase and how to > >>>>> define > >>>>> > > > ShufflingCoordinator. > >>>>> > > > > >>>>> > > > Let me know if that cannot solve your concern. > >>>>> > > > > >>>>> > > > Thanks > >>>>> > > > Gang > >>>>> > > > > >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels < > >>>>> m...@apache.org> > >>>>> > > wrote: > >>>>> > > > > >>>>> > > >> Hey Gang, > >>>>> > > >> > >>>>> > > >> What I'm looking for here is a complete picture of why the > >>>>> change is > >>>>> > > >> necessary and what the next steps are. Ultimately, refactoring > >>>>> any > >>>>> > code > >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator > >>>>> code such > >>>>> > > that > >>>>> > > >> we can add a SinkCoordinator, similar to the > SourceCoordinator. > >>>>> The > >>>>> > FLIP > >>>>> > > >> should address the next steps, i.e. how you plan to add the > >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't > >>>>> have to > >>>>> > be > >>>>> > > in > >>>>> > > >> great detail but without this information, I don't think the > >>>>> FLIP is > >>>>> > > >> complete. > >>>>> > > >> > >>>>> > > >> This feature should be generic enough to be usable by other > >>>>> sinks than > >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own > >>>>> > > implementation > >>>>> > > >> which may be outlined in a separate FLIP. > >>>>> > > >> > >>>>> > > >> Unless there is a good reason, normal operators should not > >>>>> support the > >>>>> > > >> coordinator functionality. At least I'm not convinced it would > >>>>> play > >>>>> > well > >>>>> > > >> with Flink's execution model. But I see how it is required for > >>>>> sources > >>>>> > > and > >>>>> > > >> sinks. > >>>>> > > >> > >>>>> > > >> -Max > >>>>> > > >> > >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegang...@gmail.com> > >>>>> wrote: > >>>>> > > >> > >>>>> > > >>> Hi Max, > >>>>> > > >>> > >>>>> > > >>> Thanks for reviewing. > >>>>> > > >>> > >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC > >>>>> calls > >>>>> > > >>> between the task and the job manager for communications and > >>>>> won't > >>>>> > touch > >>>>> > > >>> watermark checkpoint. > >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with > >>>>> subtasks, then > >>>>> > > it > >>>>> > > >>> can define context without extending from the > >>>>> > CoordinatorContextBase(or > >>>>> > > >>> find another class name to limit the scope). > >>>>> > > >>> > >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we will > >>>>> only do > >>>>> > > >>> context extraction. The shuffling coordinator and operator > >>>>> > > >>> < > >>>>> > > > >>>>> > > >>>>> > https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo > >>>>> > > > > >>>>> > > >>> which will use the context will come with a separate > proposal, > >>>>> thus > >>>>> > we > >>>>> > > try > >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a > >>>>> little bit > >>>>> > > more > >>>>> > > >>> about how to use the coordinator context in Flip 264 if you > >>>>> think > >>>>> > that > >>>>> > > will > >>>>> > > >>> be helpful. > >>>>> > > >>> > >>>>> > > >>> Thanks! > >>>>> > > >>> Gang > >>>>> > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels < > >>>>> m...@apache.org> > >>>>> > > >>> wrote: > >>>>> > > >>> > >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a > >>>>> bigger > >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27, was > >>>>> > > specifically > >>>>> > > >>>> added to synchronize the global watermark and to assign > splits > >>>>> > > dynamically. > >>>>> > > >>>> However, it practically allows arbitrary RPC calls between > >>>>> the task > >>>>> > > and the > >>>>> > > >>>> job manager. I understand that there is concern that such a > >>>>> powerful > >>>>> > > >>>> mechanism should not be available to all operators. > >>>>> Nevertheless, I > >>>>> > > see the > >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest > >>>>> limiting > >>>>> > > this > >>>>> > > >>>> feature to sinks (and sources) only. > >>>>> > > >>>> > >>>>> > > >>>> I'm wondering whether extracting the > SourceCoordinatorContext > >>>>> is > >>>>> > > >>>> enough to achieve what you want. There will be additional > work > >>>>> > > necessary, > >>>>> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator > >>>>> which > >>>>> > > handles > >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be > good > >>>>> to > >>>>> > > outline > >>>>> > > >>>> this in the FLIP. > >>>>> > > >>>> > >>>>> > > >>>> -Max > >>>>> > > >>>> > >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu < > >>>>> stevenz...@gmail.com> > >>>>> > > wrote: > >>>>> > > >>>> > >>>>> > > >>>>> sorry. sent the incomplete reply by mistake. > >>>>> > > >>>>> > >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In the > >>>>> > > FLINK-27405 > >>>>> > > >>>>> [1], > >>>>> > > >>>>> Avid pointed out some implications regarding checkpointing. > >>>>> In this > >>>>> > > >>>>> small > >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, > >>>>> we > >>>>> > mainly > >>>>> > > >>>>> need > >>>>> > > >>>>> the coordinator context functionality to facilitate the > >>>>> > communication > >>>>> > > >>>>> between coordinator and subtasks. > >>>>> > > >>>>> > >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405 > >>>>> > > >>>>> > >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu < > >>>>> stevenz...@gmail.com> > >>>>> > > >>>>> wrote: > >>>>> > > >>>>> > >>>>> > > >>>>> > Hang, appreciate your input. Agree that > >>>>> `CoordinatorContextBase` > >>>>> > > is a > >>>>> > > >>>>> > better name considering Flink code convention. > >>>>> > > >>>>> > > >>>>> > > >>>>> > If there are any concrete concerns, we can discuss. In > the > >>>>> jira, > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan < > >>>>> > ruanhang1...@gmail.com > >>>>> > > > > >>>>> > > >>>>> wrote: > >>>>> > > >>>>> > > >>>>> > > >>>>> >> Hi, > >>>>> > > >>>>> >> > >>>>> > > >>>>> >> IMP, I agree to extract a base class for > >>>>> > SourceCoordinatorContext. > >>>>> > > >>>>> >> But I prefer to use the name > >>>>> `OperatorCoordinatorContextBase` or > >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like > >>>>> `SourceReaderBase`. > >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems > will > >>>>> occur > >>>>> > > when > >>>>> > > >>>>> >> connectors start to use it. > >>>>> > > >>>>> >> > >>>>> > > >>>>> >> Best, > >>>>> > > >>>>> >> Hang > >>>>> > > >>>>> >> > >>>>> > > >>>>> >> Steven Wu <stevenz...@gmail.com> 于2022年10月14日周五 > 22:31写道: > >>>>> > > >>>>> >> > >>>>> > > >>>>> >> > Piotr, > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > The proposal is to extract the listed methods from > >>>>> @Iinternal > >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving > >>>>> > > >>>>> BaseCoordinatorContext. > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > The motivation is that other operators can leverage > the > >>>>> > > >>>>> communication > >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator > >>>>> subtasks. For > >>>>> > > >>>>> example, > >>>>> > > >>>>> >> in > >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink > >>>>> Iceberg sink) > >>>>> > > can > >>>>> > > >>>>> >> leverage > >>>>> > > >>>>> >> > it for computing traffic distribution statistics. > >>>>> > > >>>>> >> > * subtasks calculate local statistics and periodically > >>>>> send > >>>>> > them > >>>>> > > >>>>> to the > >>>>> > > >>>>> >> > coordinator for global aggregation. > >>>>> > > >>>>> >> > * The coordinator can broadcast the globally > aggregated > >>>>> > > >>>>> statistics to > >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling > >>>>> decision > >>>>> > > >>>>> (selecting > >>>>> > > >>>>> >> > downstream channels). > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > Thanks, > >>>>> > > >>>>> >> > Steven > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski < > >>>>> > > >>>>> pnowoj...@apache.org> > >>>>> > > >>>>> >> > wrote: > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > > Hi, > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you have > >>>>> in mind? > >>>>> > > >>>>> From the > >>>>> > > >>>>> >> > > context I would understand that the newly extracted > >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as > >>>>> > > >>>>> `@PublicEvolving` > >>>>> > > >>>>> >> or > >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and > >>>>> keeping > >>>>> > > >>>>> `@Internal` > >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class > >>>>> could have > >>>>> > > >>>>> been > >>>>> > > >>>>> >> removed > >>>>> > > >>>>> >> > > at any point of time in the future. Having said > that, > >>>>> it > >>>>> > > sounds > >>>>> > > >>>>> to me > >>>>> > > >>>>> >> > like > >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the > >>>>> first > >>>>> > > glance > >>>>> > > >>>>> and > >>>>> > > >>>>> >> you > >>>>> > > >>>>> >> > > actually want to expose the operator coordinator > >>>>> concept to > >>>>> > > the > >>>>> > > >>>>> public > >>>>> > > >>>>> >> > API? > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > AFAIK there were some discussions about that, and it > >>>>> was a > >>>>> > bit > >>>>> > > >>>>> of a > >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know > those > >>>>> > reasons > >>>>> > > >>>>> however. > >>>>> > > >>>>> >> > Only > >>>>> > > >>>>> >> > > now, I've just heard that there are for example some > >>>>> > problems > >>>>> > > >>>>> with > >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator > >>>>> > > coordinators. > >>>>> > > >>>>> Maybe > >>>>> > > >>>>> >> > > someone else could shed some light on this? > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of > exposing > >>>>> > > operator > >>>>> > > >>>>> >> > > coordinators if there is a good reason behind that, > >>>>> but it > >>>>> > is > >>>>> > > a > >>>>> > > >>>>> more > >>>>> > > >>>>> >> > > difficult topic and might be a larger effort than it > >>>>> seems > >>>>> > at > >>>>> > > >>>>> the > >>>>> > > >>>>> >> first > >>>>> > > >>>>> >> > > glance. > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > Best, > >>>>> > > >>>>> >> > > Piotrek > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu < > >>>>> stevenz...@gmail.com> > >>>>> > > >>>>> napisał(a): > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked > >>>>> google doc > >>>>> > is > >>>>> > > >>>>> not for > >>>>> > > >>>>> >> > this > >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. > >>>>> The > >>>>> > linked > >>>>> > > >>>>> google > >>>>> > > >>>>> >> doc > >>>>> > > >>>>> >> > > is > >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink > >>>>> Iceberg > >>>>> > sink, > >>>>> > > >>>>> which > >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle > >>>>> > coordinator > >>>>> > > >>>>> can > >>>>> > > >>>>> >> > leverage > >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid > code > >>>>> > > >>>>> duplication. > >>>>> > > >>>>> >> > > > > >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge < > >>>>> > j...@ververica.com> > >>>>> > > >>>>> wrote: > >>>>> > > >>>>> >> > > > > >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall > >>>>> good! One > >>>>> > > >>>>> small > >>>>> > > >>>>> >> thing, > >>>>> > > >>>>> >> > > you > >>>>> > > >>>>> >> > > > > might want to write all content on the wiki page > >>>>> instead > >>>>> > > of > >>>>> > > >>>>> >> linking > >>>>> > > >>>>> >> > to > >>>>> > > >>>>> >> > > a > >>>>> > > >>>>> >> > > > > google doc. The reason is that some people might > >>>>> not be > >>>>> > > >>>>> able to > >>>>> > > >>>>> >> > access > >>>>> > > >>>>> >> > > > the > >>>>> > > >>>>> >> > > > > google doc. > >>>>> > > >>>>> >> > > > > > >>>>> > > >>>>> >> > > > > Best regards, > >>>>> > > >>>>> >> > > > > Jing > >>>>> > > >>>>> >> > > > > > >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye < > >>>>> > > yegang...@gmail.com > >>>>> > > >>>>> > > >>>>> > > >>>>> >> wrote: > >>>>> > > >>>>> >> > > > > > >>>>> > > >>>>> >> > > > >> Hi, > >>>>> > > >>>>> >> > > > >> > >>>>> > > >>>>> >> > > > >> We submit the Flip proposal > >>>>> > > >>>>> >> > > > >> < > >>>>> > > >>>>> >> > > > >> > >>>>> > > >>>>> >> > > > > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > >>>>> > > >>>>> > >>>>> > > > >>>>> > > >>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext > >>>>> > > >>>>> >> > > > >> > > >>>>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext > >>>>> from > >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other > >>>>> > > >>>>> coordinators E.g. > >>>>> > > >>>>> >> in > >>>>> > > >>>>> >> > > the > >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink > >>>>> > > >>>>> >> > > > >> < > >>>>> > > >>>>> >> > > > >> > >>>>> > > >>>>> >> > > > > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > >>>>> > > >>>>> > >>>>> > > > >>>>> > > >>>>> > https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo > >>>>> > > >>>>> >> > > > >> > > >>>>> > > >>>>> >> > > > >> > >>>>> > > >>>>> >> > > > >> Could you help to take a look? > >>>>> > > >>>>> >> > > > >> Thanks > >>>>> > > >>>>> >> > > > >> > >>>>> > > >>>>> >> > > > >> Gang > >>>>> > > >>>>> >> > > > >> > >>>>> > > >>>>> >> > > > > > >>>>> > > >>>>> >> > > > > >>>>> > > >>>>> >> > > > >>>>> > > >>>>> >> > > >>>>> > > >>>>> >> > >>>>> > > >>>>> > > >>>>> > > >>>>> > >>>>> > > >>>> > >>>>> > > > >>>>> > > >>>>> > >>>> >