Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2023-01-30 Thread Steven Wu
Let me start an initial discussion thread at dev@flink. Like to gauge the interests from the community (including Hudi and Delta Lake) first before spending time on writing up a big FLIP. On Fri, Jan 27, 2023 at 10:45 PM Jark Wu wrote: > Thank Steven for the explanation. > > It sounds good to me

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2023-01-27 Thread Jark Wu
Thank Steven for the explanation. It sounds good to me to implement the shuffle operator in the Iceberg project first. We can contribute it to Flink DataStream in the future if other projects/connectors also need it. Best, Jark On Wed, 18 Jan 2023 at 02:11, Steven Wu wrote: > Jark, > > We wer

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2023-01-17 Thread Steven Wu
Jark, We were planning to discard the proposal due to some valid concerns raised in the thread. Also, this proposal itself didn't really save too much code duplication (maybe 100 lines or so). I also thought that the shuffle operator for DataStream can be useful for other connectors too. The shuf

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2023-01-16 Thread Jark Wu
What's the status and conclusion of this discussion? I have seen the value of exposing OperatorCoordinator because of the powerful RPC calls, some projects are already using it, such as Hudi[1]. But I agree this is a large topic and requires another FLIP. I am also concerned about extracting a Pu

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-11-03 Thread Piotr Nowojski
Ohhh, I was confused. I thought that the proposal is to make `CoordinatorContextBase` part of the public API. However, I'm also against extracting `CoordinatorContextBase` as an `@Internal` class as well. 1. Connectors shouldn't reuse internal classes. Using `@Internal` CoordinatedOperatorFactory

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-11-03 Thread Maximilian Michels
+1 If we wanted to expose the OperatorCoordinator API, we should provide an adequate interface. The FLIP partially addresses this by trying to factor out RPC code which other coordinators might make use of, but there is additional design necessary to realize a public operator API. Just to be clear

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-11-03 Thread Piotr Nowojski
Hi, Sorry for the delay, but I've given more thoughts into this. First I share the same thoughts as Maximilian, that this FLIP is incomplete. As I understand it, you are trying to hack existing code to expose small bits of internal functionalities as part of the public API without solving many of

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-11-02 Thread gang ye
Hi Max and Qingsheng, Thanks for the feedback. The initial motivation to propose this is to reduce the duplicated code since ShuffleCoordinator would need similar communication logic as SourceCoordinator to talk with operators. I understand your concern that OperatorCoordinator is an internal clas

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-11-02 Thread Qingsheng Ren
Thanks Gang and Steven for the FLIP. Actually I share the same concern with Piotr and Maximilian. OperatorCoordinator is marked as @Internal intentionally considering some existing issues, like consistency between non-source operator and coordinator on checkpoint. I'm wondering if it is useful to

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-11-01 Thread Maximilian Michels
Thanks Steven! My confusion stemmed from the lack of context in the FLIP. The first version did not lay out how the refactoring would be used down the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a non-public API and before reading the code, I wasn't even aware how exactly i

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-27 Thread Steven Wu
Max, Thanks a lot for the comments. We should clarify that the shuffle operator/coordinator is not really part of the Flink sink function/operator. shuffle operator is a custom operator that can be inserted right before the Iceberg writer operator. Shuffle operator calculates the traffic statistic

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-27 Thread Maximilian Michels
Hi Gang, Looks much better! I've actually gone through the OperatorCoordinator code. It turns out, any operator already has an OperatorCoordinator assigned. Also, any operator can add custom coordinator code. So it looks like you won't have to implement any additional runtime logic to add a Shuffl

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-27 Thread gang ye
Hi Max, I got your concern. Since shuffling support for Flink Iceberg sink is not the main body of the proposal, I add another appendix part just now with more details about how to use CoordinatorContextBase and how to define ShufflingCoordinator. Let me know if that cannot solve your concern. Th

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-27 Thread Maximilian Michels
Hey Gang, What I'm looking for here is a complete picture of why the change is necessary and what the next steps are. Ultimately, refactoring any code serves a purpose. Here, we want to refactor the Coordinator code such that we can add a SinkCoordinator, similar to the SourceCoordinator. The FLIP

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-26 Thread gang ye
Hi Max, Thanks for reviewing. For this Flip 264, yes, we will only focus on abstracting RPC calls between the task and the job manager for communications and won't touch watermark checkpoint. If the coordinator doesn't need RPC calls to talk with subtasks, then it can define context without exten

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-26 Thread Maximilian Michels
Thanks for the proposal, Gang! This is indeed somewhat of a bigger change. The coordinator for sources, as part of FLIP-27, was specifically added to synchronize the global watermark and to assign splits dynamically. However, it practically allows arbitrary RPC calls between the task and the job ma

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-16 Thread Steven Wu
sorry. sent the incomplete reply by mistake. If there are any concrete concerns, we can discuss. In the FLINK-27405 [1], Avid pointed out some implications regarding checkpointing. In this small FLIP, we are not exposing/changing any checkpointing logic, we mainly need the coordinator context func

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-16 Thread Steven Wu
Hang, appreciate your input. Agree that `CoordinatorContextBase` is a better name considering Flink code convention. If there are any concrete concerns, we can discuss. In the jira, On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan wrote: > Hi, > > IMP, I agree to extract a base class for SourceCoor

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-16 Thread Hang Ruan
Hi, IMP, I agree to extract a base class for SourceCoordinatorContext. But I prefer to use the name `OperatorCoordinatorContextBase` or `CoordinatorContextBase` as the format like `SourceReaderBase`. I also agree to what Piotr said. Maybe more problems will occur when connectors start to use it.

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-14 Thread Steven Wu
Piotr, The proposal is to extract the listed methods from @Iinternal SourceCoordinatorContext to a @PublicEvolving BaseCoordinatorContext. The motivation is that other operators can leverage the communication mechanism btw operator coordinator and operator subtasks. For example, in the linked goo

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-14 Thread Piotr Nowojski
Hi, Could you clarify what's the proposal that you have in mind? From the context I would understand that the newly extracted `BaseCoordinatorContext` would have to be marked as `@PublicEvolving` or `@Experimental`, since otherwise extracting it and keeping `@Internal` wouldn't change much? Such `

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-04 Thread Steven Wu
Jing, thanks a lot for your reply. The linked google doc is not for this FLIP, which is fully documented in the wiki page. The linked google doc is the design doc to introduce shuffling in Flink Iceberg sink, which motivated this FLIP proposal so that the shuffle coordinator can leverage the introd

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

2022-10-04 Thread Jing Ge
Thanks for bringing this up. It looks overall good! One small thing, you might want to write all content on the wiki page instead of linking to a google doc. The reason is that some people might not be able to access the google doc. Best regards, Jing On Tue, Oct 4, 2022 at 3:57 AM gang ye wrote