Hi Steven, Ok, thanks for the clarification. I'm not sure how much could be leveraged? Maybe just re-using the watermark alignment configuration? Please correct me if I'm wrong, but I think for the sole purpose of this use case, I don't see a good motivation behind expanding our APIs. Clearly this feature can be implemented already (you did it in the Iceberg connector after all).
> As another example, if there are some fast readers whose splits are always throttled, while the other slow readers > are struggling to keep up with the rest of the splits, the split enumerator may decide to reassign the slow > splits so all the readers have something to read. This would need the SplitEnumerator to be aware of the > watermark progress on each reader. So it seems useful to expose the WatermarkAlignmentEvent information to the > SplitEnumerator as well. It seems like a valid potential use case. But do we have a good enough motivation to work on it right now? Piotrek czw., 5 maj 2022 o 16:21 Steven Wu <stevenz...@gmail.com> napisał(a): > Piotr, > > With FLIP-27, Iceberg source already implemented alignment by tracking > watermark and holding back split assignment when necessary. > > The purpose of this discussion is to see if Iceberg source can leverage > some of the watermark alignment work from Flink framework. > > Thanks, > Steven > > On Thu, May 5, 2022 at 1:10 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > Ok, I see. Thanks to both of you for the explanation. > > > > Do we need changes to Apache Flink for this feature? Can it be > implemented > > in the Sources without changes in the framework? I presume source can > > access min/max watermark from the split, so as long as it also knows > > exactly which splits have finished, it would know which splits to hold > back. > > > > Best, > > Piotrek > > > > śr., 4 maj 2022 o 20:03 Steven Wu <stevenz...@gmail.com> napisał(a): > > > >> Piotr, thanks a lot for your feedback. > >> > >> > I can see this being an issue if the existence of too many blocked > >> splits is occupying too many resources. > >> > >> This is not desirable. Eagerly assigning many splits to a reader can > >> defeat the benefits of pull based dynamic split assignments. Iceberg > >> readers request one split at a time upon start or completion of a split. > >> Dynamic split assignment is better for work sharing/stealing as Becket > >> mentioned. Limiting number of active splits can be handled by the > FLIP-27 > >> Iceberg source and is somewhat orthogonal to watermark alignment. > >> > >> > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle > >> the watermark alignment and block the splits that are too much into the > >> future? > >> > >> The enumerator just assigns the next split to the requesting reader > >> instead of holding back the split assignment. Let the reader handle the > >> pause (if the file split requires alignment wait). This strategy might > >> work and leverage more from the framework. > >> > >> We probably need the following to make this work > >> * extract watermark/timestamp only at the completion of a split (not at > >> record level). Because records in a file aren't probably not sorted by > the > >> timestamp field, the pause or watermark advancement is probably better > done > >> at file level. > >> * source readers checkpoint the watermark. otherwise, upon restart > >> readers won't be able to determine the local watermark and pause for > >> alignment. We don't want to emit records upon restart due to unknown > >> watermark info. > >> > >> All, > >> > >> Any opinion on different timestamp for source alignment (vs Flink > >> application watermark)? For Iceberg source, we might want to enforce > >> alignment on kafka timestamp but Flink application watermark may use > event > >> time field from payload. > >> > >> Thanks, > >> Steven > >> > >> On Wed, May 4, 2022 at 7:02 AM Becket Qin <becket....@gmail.com> wrote: > >> > >>> Hey Piotr, > >>> > >>> I think the mechanism FLIP-182 provided is a reasonable default one, > >>> which > >>> ensures the watermarks are only drifted by an upper bound. However, > >>> admittedly there are also other strategies for different purposes. > >>> > >>> In the Iceberg case, I am not sure if a static strictly allowed > watermark > >>> drift is desired. The source might just want to finish reading the > >>> assigned > >>> splits as fast as possible. And it is OK to have a drift of "one > split", > >>> instead of a fixed time period. > >>> > >>> As another example, if there are some fast readers whose splits are > >>> always > >>> throttled, while the other slow readers are struggling to keep up with > >>> the > >>> rest of the splits, the split enumerator may decide to reassign the > slow > >>> splits so all the readers have something to read. This would need the > >>> SplitEnumerator to be aware of the watermark progress on each reader. > So > >>> it > >>> seems useful to expose the WatermarkAlignmentEvent information to the > >>> SplitEnumerator as well. > >>> > >>> Thanks, > >>> > >>> Jiangjie (Becket) Qin > >>> > >>> > >>> > >>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pnowoj...@apache.org> > >>> wrote: > >>> > >>> > Hi Steven, > >>> > > >>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just > >>> emit > >>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment > and > >>> > block the splits that are too much into the future? I can see this > >>> being an > >>> > issue if the existence of too many blocked splits is occupying too > many > >>> > resources. > >>> > > >>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would > >>> have to > >>> > decide on some basis how many and which splits to assign in what > >>> order. But > >>> > in that case I'm not sure how much you could use from FLIP-182 and > >>> > FLIP-217. They seem somehow orthogonal to me, operating on different > >>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have > >>> already > >>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217 > >>> and > >>> > take care of only the problem to limit the number of parallel active > >>> > splits. And here I'm not sure if it would be worth generalising a > >>> solution > >>> > across different connectors. > >>> > > >>> > Regarding the global watermark, I made a related comment sometime ago > >>> > about it [1]. It sounds to me like you also need to solve this > problem, > >>> > otherwise Iceberg users will encounter late records in case of some > >>> race > >>> > conditions between assigning new splits and completions of older. > >>> > > >>> > Best, > >>> > Piotrek > >>> > > >>> > [1] > >>> > > >>> > https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545 > >>> > > >>> > pon., 2 maj 2022 o 04:26 Steven Wu <stevenz...@gmail.com> > napisał(a): > >>> > > >>> >> add dev@ group to the thread as Thomas suggested > >>> >> > >>> >> Arvid, > >>> >> > >>> >> The scenario 3 (Dynamic assignment + temporary no split) in the > >>> FLIP-180 > >>> >> (idleness) can happen to Iceberg source alignment, as readers can be > >>> >> temporarily starved due to the holdback by the enumerator when > >>> assigning > >>> >> new splits upon request. > >>> >> > >>> >> Totally agree that we should decouple this discussion with the > >>> FLIP-217, > >>> >> which addresses the split level watermark alignment problem as a > >>> follow-up > >>> >> of FLIP-182 > >>> >> > >>> >> Becket, > >>> >> > >>> >> Yes, currently Iceberg source implemented the alignment leveraging > the > >>> >> dynamic split assignment from FLIP-27 design. Basically, the > >>> enumerator > >>> >> can > >>> >> hold back split assignments to readers when necessary. Everything > are > >>> >> centralized in the enumerator: (1) watermark extraction and > >>> aggregation > >>> >> (2) > >>> >> alignment decision and execution > >>> >> > >>> >> The motivation of this discussion is to see if Iceberg source can > >>> leverage > >>> >> some of the watermark alignment solutions (like FLIP-182) from Flink > >>> >> framework. E.g., as mentioned in the doc, Iceberg source can > >>> potentially > >>> >> leverage the FLIP-182 framework to do the watermark extraction and > >>> >> aggregation. For the alignment decision and execution, we can keep > >>> them in > >>> >> the centralized enumerator. > >>> >> > >>> >> Thanks, > >>> >> Steven > >>> >> > >>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com> > >>> wrote: > >>> >> > >>> >> > Hi Steven, > >>> >> > > >>> >> > Thanks for pulling me into this thread. I think the timestamp > >>> >> > alignment use case here is a good example of what FLIP-27 was > >>> designed > >>> >> for. > >>> >> > > >>> >> > Technically speaking, Iceberg source can already implement the > >>> timestamp > >>> >> > alignment in the Flink new source even without FLIP-182. However, > I > >>> >> > understand the rationale here because timestamp alignment is also > >>> >> trying to > >>> >> > orchestrate the consumption of splits. However, it looks like > >>> FLIP-182 > >>> >> was > >>> >> > not designed in a way that it can be easily extended for other use > >>> >> cases. > >>> >> > It may probably worth thinking of a more general mechanism to > >>> answer the > >>> >> > following questions: > >>> >> > > >>> >> > 1. What information whose source of truth is the Flink framework > >>> should > >>> >> be > >>> >> > exposed to the SplitEnumerator and SourceReader? And how? > >>> >> > 2. What control actions in the Flink framework are worth exposing > >>> to the > >>> >> > SplitEnumerators and SourceReaders? And how? > >>> >> > > >>> >> > In the context of timestamp alignment, the first question is more > >>> >> > relevant. For example, instead of hardcode the > ReportWatermarkEvent > >>> >> > handling logic in the SourceCoordinator, should we expose this to > >>> the > >>> >> > SplitEnumerator? So basically there will be some information, such > >>> as > >>> >> > subtask local watermark, whose source of truth is Flink runtime, > but > >>> >> useful > >>> >> > to the user provided pluggables. > >>> >> > > >>> >> > I think there are a few control flow patterns to make a complete > >>> design: > >>> >> > > >>> >> > a. Framework space information (e.g. watermark) --> User space > >>> >> Pluggables > >>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading > a > >>> >> split). > >>> >> > b. Framework space information (e.g. task failure) --> User space > >>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions > (e.g. > >>> exit > >>> >> > the job) > >>> >> > c. User space information (e.g. a custom workload metric) --> User > >>> space > >>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g. > >>> rebalance > >>> >> > the workload across the source readers). > >>> >> > d. Use space information (e.g. a custom stopping event in the > >>> stream) > >>> >> --> > >>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space > >>> actions > >>> >> > (e.g. stop the job). > >>> >> > > >>> >> > So basically for any user provided pluggables, the input > >>> information may > >>> >> > either come from another user provided logic or from the > framework, > >>> and > >>> >> > after receiving that information, the pluggable may either want > the > >>> >> > framework or another pluggable to take an action. So this gives > the > >>> >> above 4 > >>> >> > combinations. > >>> >> > > >>> >> > In our case, when the pluggables are SplitEnumerator and > >>> SourceReader, > >>> >> the > >>> >> > control flows that only involve user space actions are fully > >>> supported. > >>> >> But > >>> >> > it seems that when it comes to control flows involving framework > >>> space > >>> >> > information, some of the information has not been exposed to the > >>> >> pluggable, > >>> >> > and some framework actions might also be missing. > >>> >> > > >>> >> > Thanks, > >>> >> > > >>> >> > Jiangjie (Becket) Qin > >>> >> > > >>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> > >>> wrote: > >>> >> > > >>> >> >> Hi folks, > >>> >> >> > >>> >> >> quick input from my side. I think this is from the implementation > >>> >> >> perspective what Piotr and I had in mind for a global min > watermark > >>> >> that > >>> >> >> helps in idleness cases. See also point 3 in > >>> >> >> > >>> >> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition > >>> >> >> . > >>> >> >> > >>> >> >> Basically, we would like to empower source enumerators to > >>> determine the > >>> >> >> global min watermark for all source readers factoring in even > >>> future > >>> >> >> splits. Not all sources can supply that information (think of a > >>> general > >>> >> >> file source) but most should be able to. Basically, Flink should > >>> know > >>> >> for a > >>> >> >> given source at a given point in time what the min watermark > >>> across all > >>> >> >> source subtasks is. > >>> >> >> > >>> >> >> Here is some background: > >>> >> >> In the context of idleness, we can deterministically advance the > >>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in > >>> >> sources > >>> >> >> to switch to idleness and thus allow watermarks to increase in > >>> cases > >>> >> where > >>> >> >> fewer splits than source tasks are available. However, for > sources > >>> with > >>> >> >> dynamic split discovery that actually yields incorrect results. > >>> Think > >>> >> of a > >>> >> >> Kinesis consumer where a shard is split. Then a previously idle > >>> source > >>> >> >> subtask may receive a new split with time t0 as the lowest > >>> timestamp. > >>> >> Since > >>> >> >> the source subtask did not participate in the global watermark > >>> >> generation > >>> >> >> (because it was idle), the previously emitted watermark may be > >>> past t0 > >>> >> and > >>> >> >> thus results in late records potentially being discarded. A rerun > >>> of > >>> >> the > >>> >> >> same pipeline on historic data would not render the source > subtask > >>> >> idle and > >>> >> >> not result in late records. The solution was to not render source > >>> >> subtasks > >>> >> >> automatically idle by the framework if there are no spits. That > >>> leads > >>> >> to > >>> >> >> confusion for Kafka users with static topic subscription where > >>> #splits > >>> >> < > >>> >> >> #parallelism stalls pipelines because the watermark is not > >>> advancing. > >>> >> Here, > >>> >> >> your sketched solution can be transferred to KafkaSource to let > >>> Flink > >>> >> know > >>> >> >> that min global watermark on a static assignment is determined by > >>> the > >>> >> >> slowest partition. Hence, all idle readers emit that min global > >>> >> watermark > >>> >> >> and the user sees progress. > >>> >> >> This whole idea is related to FLIP-182 watermark alignment but > I'd > >>> go > >>> >> >> with another FLIP as the goal is quite different even though the > >>> >> >> implementation overlaps. > >>> >> >> > >>> >> >> Now Iceberg seems to use the same information to actually pause > the > >>> >> >> consumption of files and create some kind of orderness guarantees > >>> as > >>> >> far as > >>> >> >> I understood. This probably can be applied to any source with > >>> dynamic > >>> >> split > >>> >> >> discovery. However, I wouldn't mix up the concepts and hence I > >>> >> appreciate > >>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of > >>> >> FLIP-182 is > >>> >> >> to pause readers while consuming a split, while your approach > >>> pauses > >>> >> >> readers before processing another split. So it feels more closely > >>> >> related > >>> >> >> to the global min watermark - so it could either be part of that > >>> FLIP > >>> >> or a > >>> >> >> FLIP of its own. Afaik API changes should actually happen only on > >>> the > >>> >> >> enumerator side both for your ideas and for global min watermark. > >>> >> >> > >>> >> >> Best, > >>> >> >> > >>> >> >> Arvid > >>> >> >> > >>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org> > >>> wrote: > >>> >> >> > >>> >> >>> Hi Steven, > >>> >> >>> > >>> >> >>> Would it be better to bring this as a separate thread related to > >>> >> Iceberg > >>> >> >>> source to the dev@ list? I think this could benefit from > broader > >>> >> input? > >>> >> >>> > >>> >> >>> Thanks > >>> >> >>> > >>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com > > > >>> >> wrote: > >>> >> >>> > >>> >> >>>> + Becket and Sebastian > >>> >> >>>> > >>> >> >>>> It is also related to the split level watermark alignment > >>> discussion > >>> >> >>>> thread. Because it is already very long, I don't want to > further > >>> >> complicate > >>> >> >>>> the ongoing discussion there. But I can move the discussion to > >>> that > >>> >> >>>> existing thread if that is preferred. > >>> >> >>>> > >>> >> >>>> > >>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu < > stevenz...@gmail.com > >>> > > >>> >> >>>> wrote: > >>> >> >>>> > >>> >> >>>>> Hi all, > >>> >> >>>>> > >>> >> >>>>> We are thinking about how to align with the Flink community > and > >>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg > >>> source. I > >>> >> put some > >>> >> >>>>> context in this google doc. Would love to get hear your > >>> thoughts on > >>> >> this. > >>> >> >>>>> > >>> >> >>>>> > >>> >> >>>>> > >>> >> > >>> > https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit# > >>> >> >>>>> > >>> >> >>>>> Thanks, > >>> >> >>>>> Steven > >>> >> >>>>> > >>> >> >>>> > >>> >> > >>> > > >>> > >> >