Option 1 sounds reasonable but I would be tempted to wait for a second motivational use case before generalizing the framework. However I wouldn’t oppose this extension if others feel it’s useful and good thing to do
Piotrek > Wiadomość napisana przez Becket Qin <[email protected]> w dniu 06.05.2022, > o godz. 03:50: > > I think the key point here is essentially what information should Flink > expose to the user pluggables. Apparently split / local task watermark is > something many user pluggables would be interested in. Right now it is > calculated by the Flink framework but not exposed to the users space, i.e. > SourceReader / SplitEnumerator. So it looks at least we can offer this > information in some way so users can leverage that information to do > things. > > That said, I am not sure if this would help in the Iceberg alignment case. > Because at this point, FLIP-182 reports source reader watermarks > periodically, which may not align with the RequestSplitEvent. So if we > really want to leverage the FLIP-182 mechanism here, I see a few ways, just > to name two of them: > 1. we can expose the source reader watermark in the SourceReaderContext, so > the source readers can put the local watermark in a custom operator event. > This will effectively bypass the existing RequestSplitEvent. Or we can also > extend the RequestSplitEvent to add an additional info field of byte[] > type, so users can piggy-back additional information there, be it watermark > or other stuff. > 2. Simply piggy-back the local watermark in the RequestSplitEvent and pass > that info to the SplitEnumerator as well. > > If we are going to do this, personally I'd prefer the first way, as it > provides a mechanism to allow future extension. So it would be easier to > expose other framework information to the user space in the future. > > Thanks, > > Jiangjie (Becket) Qin > > > >> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <[email protected]> wrote: >> >>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <[email protected]> wrote: >>> Any opinion on different timestamp for source alignment (vs Flink >> application watermark)? For Iceberg source, we might want to enforce >> alignment on kafka timestamp but Flink application watermark may use event >> time field from payload. >> >> I imagine that more generally the question is alignment based on the >> iceberg partition/file metadata vs. individual rows? I think that >> should work as long as there is a guarantee for out of orderness >> within the split? >> >> Thomas >> >>> >>> Thanks, >>> Steven >>> >>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <[email protected]> wrote: >>>> >>>> Hey Piotr, >>>> >>>> I think the mechanism FLIP-182 provided is a reasonable default one, >> which >>>> ensures the watermarks are only drifted by an upper bound. However, >>>> admittedly there are also other strategies for different purposes. >>>> >>>> In the Iceberg case, I am not sure if a static strictly allowed >> watermark >>>> drift is desired. The source might just want to finish reading the >> assigned >>>> splits as fast as possible. And it is OK to have a drift of "one split", >>>> instead of a fixed time period. >>>> >>>> As another example, if there are some fast readers whose splits are >> always >>>> throttled, while the other slow readers are struggling to keep up with >> the >>>> rest of the splits, the split enumerator may decide to reassign the slow >>>> splits so all the readers have something to read. This would need the >>>> SplitEnumerator to be aware of the watermark progress on each reader. >> So it >>>> seems useful to expose the WatermarkAlignmentEvent information to the >>>> SplitEnumerator as well. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> >>>> >>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <[email protected]> >> wrote: >>>> >>>>> Hi Steven, >>>>> >>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just >> emit >>>>> all splits and let FLIP-182/FLIP-217 handle the watermark alignment >> and >>>>> block the splits that are too much into the future? I can see this >> being an >>>>> issue if the existence of too many blocked splits is occupying too >> many >>>>> resources. >>>>> >>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would >> have to >>>>> decide on some basis how many and which splits to assign in what >> order. But >>>>> in that case I'm not sure how much you could use from FLIP-182 and >>>>> FLIP-217. They seem somehow orthogonal to me, operating on different >>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits have >> already >>>>> been generated and assigned. You could leverage FLIP-182 and FLIP-217 >> and >>>>> take care of only the problem to limit the number of parallel active >>>>> splits. And here I'm not sure if it would be worth generalising a >> solution >>>>> across different connectors. >>>>> >>>>> Regarding the global watermark, I made a related comment sometime ago >>>>> about it [1]. It sounds to me like you also need to solve this >> problem, >>>>> otherwise Iceberg users will encounter late records in case of some >> race >>>>> conditions between assigning new splits and completions of older. >>>>> >>>>> Best, >>>>> Piotrek >>>>> >>>>> [1] >>>>> >> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545 >>>>> >>>>> pon., 2 maj 2022 o 04:26 Steven Wu <[email protected]> napisał(a): >>>>> >>>>>> add dev@ group to the thread as Thomas suggested >>>>>> >>>>>> Arvid, >>>>>> >>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the >> FLIP-180 >>>>>> (idleness) can happen to Iceberg source alignment, as readers can be >>>>>> temporarily starved due to the holdback by the enumerator when >> assigning >>>>>> new splits upon request. >>>>>> >>>>>> Totally agree that we should decouple this discussion with the >> FLIP-217, >>>>>> which addresses the split level watermark alignment problem as a >> follow-up >>>>>> of FLIP-182 >>>>>> >>>>>> Becket, >>>>>> >>>>>> Yes, currently Iceberg source implemented the alignment leveraging >> the >>>>>> dynamic split assignment from FLIP-27 design. Basically, the >> enumerator >>>>>> can >>>>>> hold back split assignments to readers when necessary. Everything are >>>>>> centralized in the enumerator: (1) watermark extraction and >> aggregation >>>>>> (2) >>>>>> alignment decision and execution >>>>>> >>>>>> The motivation of this discussion is to see if Iceberg source can >> leverage >>>>>> some of the watermark alignment solutions (like FLIP-182) from Flink >>>>>> framework. E.g., as mentioned in the doc, Iceberg source can >> potentially >>>>>> leverage the FLIP-182 framework to do the watermark extraction and >>>>>> aggregation. For the alignment decision and execution, we can keep >> them in >>>>>> the centralized enumerator. >>>>>> >>>>>> Thanks, >>>>>> Steven >>>>>> >>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <[email protected]> >> wrote: >>>>>> >>>>>>> Hi Steven, >>>>>>> >>>>>>> Thanks for pulling me into this thread. I think the timestamp >>>>>>> alignment use case here is a good example of what FLIP-27 was >> designed >>>>>> for. >>>>>>> >>>>>>> Technically speaking, Iceberg source can already implement the >> timestamp >>>>>>> alignment in the Flink new source even without FLIP-182. However, I >>>>>>> understand the rationale here because timestamp alignment is also >>>>>> trying to >>>>>>> orchestrate the consumption of splits. However, it looks like >> FLIP-182 >>>>>> was >>>>>>> not designed in a way that it can be easily extended for other use >>>>>> cases. >>>>>>> It may probably worth thinking of a more general mechanism to >> answer the >>>>>>> following questions: >>>>>>> >>>>>>> 1. What information whose source of truth is the Flink framework >> should >>>>>> be >>>>>>> exposed to the SplitEnumerator and SourceReader? And how? >>>>>>> 2. What control actions in the Flink framework are worth exposing >> to the >>>>>>> SplitEnumerators and SourceReaders? And how? >>>>>>> >>>>>>> In the context of timestamp alignment, the first question is more >>>>>>> relevant. For example, instead of hardcode the ReportWatermarkEvent >>>>>>> handling logic in the SourceCoordinator, should we expose this to >> the >>>>>>> SplitEnumerator? So basically there will be some information, such >> as >>>>>>> subtask local watermark, whose source of truth is Flink runtime, >> but >>>>>> useful >>>>>>> to the user provided pluggables. >>>>>>> >>>>>>> I think there are a few control flow patterns to make a complete >> design: >>>>>>> >>>>>>> a. Framework space information (e.g. watermark) --> User space >>>>>> Pluggables >>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a >>>>>> split). >>>>>>> b. Framework space information (e.g. task failure) --> User space >>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions >> (e.g. exit >>>>>>> the job) >>>>>>> c. User space information (e.g. a custom workload metric) --> User >> space >>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g. >> rebalance >>>>>>> the workload across the source readers). >>>>>>> d. Use space information (e.g. a custom stopping event in the >> stream) >>>>>> --> >>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space >> actions >>>>>>> (e.g. stop the job). >>>>>>> >>>>>>> So basically for any user provided pluggables, the input >> information may >>>>>>> either come from another user provided logic or from the >> framework, and >>>>>>> after receiving that information, the pluggable may either want the >>>>>>> framework or another pluggable to take an action. So this gives the >>>>>> above 4 >>>>>>> combinations. >>>>>>> >>>>>>> In our case, when the pluggables are SplitEnumerator and >> SourceReader, >>>>>> the >>>>>>> control flows that only involve user space actions are fully >> supported. >>>>>> But >>>>>>> it seems that when it comes to control flows involving framework >> space >>>>>>> information, some of the information has not been exposed to the >>>>>> pluggable, >>>>>>> and some framework actions might also be missing. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <[email protected]> >> wrote: >>>>>>> >>>>>>>> Hi folks, >>>>>>>> >>>>>>>> quick input from my side. I think this is from the implementation >>>>>>>> perspective what Piotr and I had in mind for a global min >> watermark >>>>>> that >>>>>>>> helps in idleness cases. See also point 3 in >>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition >>>>>>>> . >>>>>>>> >>>>>>>> Basically, we would like to empower source enumerators to >> determine the >>>>>>>> global min watermark for all source readers factoring in even >> future >>>>>>>> splits. Not all sources can supply that information (think of a >> general >>>>>>>> file source) but most should be able to. Basically, Flink should >> know >>>>>> for a >>>>>>>> given source at a given point in time what the min watermark >> across all >>>>>>>> source subtasks is. >>>>>>>> >>>>>>>> Here is some background: >>>>>>>> In the context of idleness, we can deterministically advance the >>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in >>>>>> sources >>>>>>>> to switch to idleness and thus allow watermarks to increase in >> cases >>>>>> where >>>>>>>> fewer splits than source tasks are available. However, for >> sources with >>>>>>>> dynamic split discovery that actually yields incorrect results. >> Think >>>>>> of a >>>>>>>> Kinesis consumer where a shard is split. Then a previously idle >> source >>>>>>>> subtask may receive a new split with time t0 as the lowest >> timestamp. >>>>>> Since >>>>>>>> the source subtask did not participate in the global watermark >>>>>> generation >>>>>>>> (because it was idle), the previously emitted watermark may be >> past t0 >>>>>> and >>>>>>>> thus results in late records potentially being discarded. A rerun >> of >>>>>> the >>>>>>>> same pipeline on historic data would not render the source subtask >>>>>> idle and >>>>>>>> not result in late records. The solution was to not render source >>>>>> subtasks >>>>>>>> automatically idle by the framework if there are no spits. That >> leads >>>>>> to >>>>>>>> confusion for Kafka users with static topic subscription where >> #splits >>>>>> < >>>>>>>> #parallelism stalls pipelines because the watermark is not >> advancing. >>>>>> Here, >>>>>>>> your sketched solution can be transferred to KafkaSource to let >> Flink >>>>>> know >>>>>>>> that min global watermark on a static assignment is determined by >> the >>>>>>>> slowest partition. Hence, all idle readers emit that min global >>>>>> watermark >>>>>>>> and the user sees progress. >>>>>>>> This whole idea is related to FLIP-182 watermark alignment but >> I'd go >>>>>>>> with another FLIP as the goal is quite different even though the >>>>>>>> implementation overlaps. >>>>>>>> >>>>>>>> Now Iceberg seems to use the same information to actually pause >> the >>>>>>>> consumption of files and create some kind of orderness guarantees >> as >>>>>> far as >>>>>>>> I understood. This probably can be applied to any source with >> dynamic >>>>>> split >>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I >>>>>> appreciate >>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of >>>>>> FLIP-182 is >>>>>>>> to pause readers while consuming a split, while your approach >> pauses >>>>>>>> readers before processing another split. So it feels more closely >>>>>> related >>>>>>>> to the global min watermark - so it could either be part of that >> FLIP >>>>>> or a >>>>>>>> FLIP of its own. Afaik API changes should actually happen only on >> the >>>>>>>> enumerator side both for your ideas and for global min watermark. >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Arvid >>>>>>>> >>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <[email protected]> >> wrote: >>>>>>>> >>>>>>>>> Hi Steven, >>>>>>>>> >>>>>>>>> Would it be better to bring this as a separate thread related to >>>>>> Iceberg >>>>>>>>> source to the dev@ list? I think this could benefit from broader >>>>>> input? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <[email protected]> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> + Becket and Sebastian >>>>>>>>>> >>>>>>>>>> It is also related to the split level watermark alignment >> discussion >>>>>>>>>> thread. Because it is already very long, I don't want to further >>>>>> complicate >>>>>>>>>> the ongoing discussion there. But I can move the discussion to >> that >>>>>>>>>> existing thread if that is preferred. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu < >> [email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> We are thinking about how to align with the Flink community and >>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg >> source. I >>>>>> put some >>>>>>>>>>> context in this google doc. Would love to get hear your >> thoughts on >>>>>> this. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>> >> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit# >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Steven >>>>>>>>>>> >>>>>>>>>> >>>>>> >>>>> >>
