Ok, I see. Thanks to both of you for the explanation. Do we need changes to Apache Flink for this feature? Can it be implemented in the Sources without changes in the framework? I presume source can access min/max watermark from the split, so as long as it also knows exactly which splits have finished, it would know which splits to hold back.
Best, Piotrek śr., 4 maj 2022 o 20:03 Steven Wu <stevenz...@gmail.com> napisał(a): > Piotr, thanks a lot for your feedback. > > > I can see this being an issue if the existence of too many blocked > splits is occupying too many resources. > > This is not desirable. Eagerly assigning many splits to a reader can > defeat the benefits of pull based dynamic split assignments. Iceberg > readers request one split at a time upon start or completion of a split. > Dynamic split assignment is better for work sharing/stealing as Becket > mentioned. Limiting number of active splits can be handled by the FLIP-27 > Iceberg source and is somewhat orthogonal to watermark alignment. > > > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle > the watermark alignment and block the splits that are too much into the > future? > > The enumerator just assigns the next split to the requesting reader > instead of holding back the split assignment. Let the reader handle the > pause (if the file split requires alignment wait). This strategy might > work and leverage more from the framework. > > We probably need the following to make this work > * extract watermark/timestamp only at the completion of a split (not at > record level). Because records in a file aren't probably not sorted by the > timestamp field, the pause or watermark advancement is probably better done > at file level. > * source readers checkpoint the watermark. otherwise, upon restart readers > won't be able to determine the local watermark and pause for alignment. We > don't want to emit records upon restart due to unknown watermark info. > > All, > > Any opinion on different timestamp for source alignment (vs Flink > application watermark)? For Iceberg source, we might want to enforce > alignment on kafka timestamp but Flink application watermark may use event > time field from payload. > > Thanks, > Steven > > On Wed, May 4, 2022 at 7:02 AM Becket Qin <becket....@gmail.com> wrote: > >> Hey Piotr, >> >> I think the mechanism FLIP-182 provided is a reasonable default one, which >> ensures the watermarks are only drifted by an upper bound. However, >> admittedly there are also other strategies for different purposes. >> >> In the Iceberg case, I am not sure if a static strictly allowed watermark >> drift is desired. The source might just want to finish reading the >> assigned >> splits as fast as possible. And it is OK to have a drift of "one split", >> instead of a fixed time period. >> >> As another example, if there are some fast readers whose splits are always >> throttled, while the other slow readers are struggling to keep up with the >> rest of the splits, the split enumerator may decide to reassign the slow >> splits so all the readers have something to read. This would need the >> SplitEnumerator to be aware of the watermark progress on each reader. So >> it >> seems useful to expose the WatermarkAlignmentEvent information to the >> SplitEnumerator as well. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >> > Hi Steven, >> > >> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit >> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and >> > block the splits that are too much into the future? I can see this >> being an >> > issue if the existence of too many blocked splits is occupying too many >> > resources. >> > >> > If that's the case, indeed SourceCoordinator/SplitEnumerator would have >> to >> > decide on some basis how many and which splits to assign in what order. >> But >> > in that case I'm not sure how much you could use from FLIP-182 and >> > FLIP-217. They seem somehow orthogonal to me, operating on different >> > levels. FLIP-182 and FLIP-217 are working with whatever splits have >> already >> > been generated and assigned. You could leverage FLIP-182 and FLIP-217 >> and >> > take care of only the problem to limit the number of parallel active >> > splits. And here I'm not sure if it would be worth generalising a >> solution >> > across different connectors. >> > >> > Regarding the global watermark, I made a related comment sometime ago >> > about it [1]. It sounds to me like you also need to solve this problem, >> > otherwise Iceberg users will encounter late records in case of some race >> > conditions between assigning new splits and completions of older. >> > >> > Best, >> > Piotrek >> > >> > [1] >> > >> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545 >> > >> > pon., 2 maj 2022 o 04:26 Steven Wu <stevenz...@gmail.com> napisał(a): >> > >> >> add dev@ group to the thread as Thomas suggested >> >> >> >> Arvid, >> >> >> >> The scenario 3 (Dynamic assignment + temporary no split) in the >> FLIP-180 >> >> (idleness) can happen to Iceberg source alignment, as readers can be >> >> temporarily starved due to the holdback by the enumerator when >> assigning >> >> new splits upon request. >> >> >> >> Totally agree that we should decouple this discussion with the >> FLIP-217, >> >> which addresses the split level watermark alignment problem as a >> follow-up >> >> of FLIP-182 >> >> >> >> Becket, >> >> >> >> Yes, currently Iceberg source implemented the alignment leveraging the >> >> dynamic split assignment from FLIP-27 design. Basically, the enumerator >> >> can >> >> hold back split assignments to readers when necessary. Everything are >> >> centralized in the enumerator: (1) watermark extraction and aggregation >> >> (2) >> >> alignment decision and execution >> >> >> >> The motivation of this discussion is to see if Iceberg source can >> leverage >> >> some of the watermark alignment solutions (like FLIP-182) from Flink >> >> framework. E.g., as mentioned in the doc, Iceberg source can >> potentially >> >> leverage the FLIP-182 framework to do the watermark extraction and >> >> aggregation. For the alignment decision and execution, we can keep >> them in >> >> the centralized enumerator. >> >> >> >> Thanks, >> >> Steven >> >> >> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com> >> wrote: >> >> >> >> > Hi Steven, >> >> > >> >> > Thanks for pulling me into this thread. I think the timestamp >> >> > alignment use case here is a good example of what FLIP-27 was >> designed >> >> for. >> >> > >> >> > Technically speaking, Iceberg source can already implement the >> timestamp >> >> > alignment in the Flink new source even without FLIP-182. However, I >> >> > understand the rationale here because timestamp alignment is also >> >> trying to >> >> > orchestrate the consumption of splits. However, it looks like >> FLIP-182 >> >> was >> >> > not designed in a way that it can be easily extended for other use >> >> cases. >> >> > It may probably worth thinking of a more general mechanism to answer >> the >> >> > following questions: >> >> > >> >> > 1. What information whose source of truth is the Flink framework >> should >> >> be >> >> > exposed to the SplitEnumerator and SourceReader? And how? >> >> > 2. What control actions in the Flink framework are worth exposing to >> the >> >> > SplitEnumerators and SourceReaders? And how? >> >> > >> >> > In the context of timestamp alignment, the first question is more >> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent >> >> > handling logic in the SourceCoordinator, should we expose this to the >> >> > SplitEnumerator? So basically there will be some information, such as >> >> > subtask local watermark, whose source of truth is Flink runtime, but >> >> useful >> >> > to the user provided pluggables. >> >> > >> >> > I think there are a few control flow patterns to make a complete >> design: >> >> > >> >> > a. Framework space information (e.g. watermark) --> User space >> >> Pluggables >> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a >> >> split). >> >> > b. Framework space information (e.g. task failure) --> User space >> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g. >> exit >> >> > the job) >> >> > c. User space information (e.g. a custom workload metric) --> User >> space >> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g. >> rebalance >> >> > the workload across the source readers). >> >> > d. Use space information (e.g. a custom stopping event in the stream) >> >> --> >> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space >> actions >> >> > (e.g. stop the job). >> >> > >> >> > So basically for any user provided pluggables, the input information >> may >> >> > either come from another user provided logic or from the framework, >> and >> >> > after receiving that information, the pluggable may either want the >> >> > framework or another pluggable to take an action. So this gives the >> >> above 4 >> >> > combinations. >> >> > >> >> > In our case, when the pluggables are SplitEnumerator and >> SourceReader, >> >> the >> >> > control flows that only involve user space actions are fully >> supported. >> >> But >> >> > it seems that when it comes to control flows involving framework >> space >> >> > information, some of the information has not been exposed to the >> >> pluggable, >> >> > and some framework actions might also be missing. >> >> > >> >> > Thanks, >> >> > >> >> > Jiangjie (Becket) Qin >> >> > >> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> >> wrote: >> >> > >> >> >> Hi folks, >> >> >> >> >> >> quick input from my side. I think this is from the implementation >> >> >> perspective what Piotr and I had in mind for a global min watermark >> >> that >> >> >> helps in idleness cases. See also point 3 in >> >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition >> >> >> . >> >> >> >> >> >> Basically, we would like to empower source enumerators to determine >> the >> >> >> global min watermark for all source readers factoring in even future >> >> >> splits. Not all sources can supply that information (think of a >> general >> >> >> file source) but most should be able to. Basically, Flink should >> know >> >> for a >> >> >> given source at a given point in time what the min watermark across >> all >> >> >> source subtasks is. >> >> >> >> >> >> Here is some background: >> >> >> In the context of idleness, we can deterministically advance the >> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in >> >> sources >> >> >> to switch to idleness and thus allow watermarks to increase in cases >> >> where >> >> >> fewer splits than source tasks are available. However, for sources >> with >> >> >> dynamic split discovery that actually yields incorrect results. >> Think >> >> of a >> >> >> Kinesis consumer where a shard is split. Then a previously idle >> source >> >> >> subtask may receive a new split with time t0 as the lowest >> timestamp. >> >> Since >> >> >> the source subtask did not participate in the global watermark >> >> generation >> >> >> (because it was idle), the previously emitted watermark may be past >> t0 >> >> and >> >> >> thus results in late records potentially being discarded. A rerun of >> >> the >> >> >> same pipeline on historic data would not render the source subtask >> >> idle and >> >> >> not result in late records. The solution was to not render source >> >> subtasks >> >> >> automatically idle by the framework if there are no spits. That >> leads >> >> to >> >> >> confusion for Kafka users with static topic subscription where >> #splits >> >> < >> >> >> #parallelism stalls pipelines because the watermark is not >> advancing. >> >> Here, >> >> >> your sketched solution can be transferred to KafkaSource to let >> Flink >> >> know >> >> >> that min global watermark on a static assignment is determined by >> the >> >> >> slowest partition. Hence, all idle readers emit that min global >> >> watermark >> >> >> and the user sees progress. >> >> >> This whole idea is related to FLIP-182 watermark alignment but I'd >> go >> >> >> with another FLIP as the goal is quite different even though the >> >> >> implementation overlaps. >> >> >> >> >> >> Now Iceberg seems to use the same information to actually pause the >> >> >> consumption of files and create some kind of orderness guarantees as >> >> far as >> >> >> I understood. This probably can be applied to any source with >> dynamic >> >> split >> >> >> discovery. However, I wouldn't mix up the concepts and hence I >> >> appreciate >> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of >> >> FLIP-182 is >> >> >> to pause readers while consuming a split, while your approach pauses >> >> >> readers before processing another split. So it feels more closely >> >> related >> >> >> to the global min watermark - so it could either be part of that >> FLIP >> >> or a >> >> >> FLIP of its own. Afaik API changes should actually happen only on >> the >> >> >> enumerator side both for your ideas and for global min watermark. >> >> >> >> >> >> Best, >> >> >> >> >> >> Arvid >> >> >> >> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org> >> wrote: >> >> >> >> >> >>> Hi Steven, >> >> >>> >> >> >>> Would it be better to bring this as a separate thread related to >> >> Iceberg >> >> >>> source to the dev@ list? I think this could benefit from broader >> >> input? >> >> >>> >> >> >>> Thanks >> >> >>> >> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com> >> >> wrote: >> >> >>> >> >> >>>> + Becket and Sebastian >> >> >>>> >> >> >>>> It is also related to the split level watermark alignment >> discussion >> >> >>>> thread. Because it is already very long, I don't want to further >> >> complicate >> >> >>>> the ongoing discussion there. But I can move the discussion to >> that >> >> >>>> existing thread if that is preferred. >> >> >>>> >> >> >>>> >> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <stevenz...@gmail.com> >> >> >>>> wrote: >> >> >>>> >> >> >>>>> Hi all, >> >> >>>>> >> >> >>>>> We are thinking about how to align with the Flink community and >> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg source. >> I >> >> put some >> >> >>>>> context in this google doc. Would love to get hear your thoughts >> on >> >> this. >> >> >>>>> >> >> >>>>> >> >> >>>>> >> >> >> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit# >> >> >>>>> >> >> >>>>> Thanks, >> >> >>>>> Steven >> >> >>>>> >> >> >>>> >> >> >> > >> >