Ok, I see. Thanks to both of you for the explanation.

Do we need changes to Apache Flink for this feature? Can it be implemented
in the Sources without changes in the framework? I presume source can
access min/max watermark from the split, so as long as it also knows
exactly which splits have finished, it would know which splits to hold back.

Best,
Piotrek

śr., 4 maj 2022 o 20:03 Steven Wu <stevenz...@gmail.com> napisał(a):

> Piotr, thanks a lot for your feedback.
>
> > I can see this being an issue if the existence of too many blocked
> splits is occupying too many resources.
>
> This is not desirable. Eagerly assigning many splits to a reader can
> defeat the benefits of pull based dynamic split assignments. Iceberg
> readers request one split at a time upon start or completion of a split.
> Dynamic split assignment is better for work sharing/stealing as Becket
> mentioned. Limiting number of active splits can be handled by the FLIP-27
> Iceberg source and is somewhat orthogonal to watermark alignment.
>
> > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle
> the watermark alignment and block the splits that are too much into the
> future?
>
> The enumerator just assigns the next split to the requesting reader
> instead of holding back the split assignment. Let the reader handle the
> pause (if the file split requires alignment wait).  This strategy might
> work and leverage more from the framework.
>
> We probably need the following to make this work
> * extract watermark/timestamp only at the completion of a split (not at
> record level). Because records in a file aren't probably not sorted by the
> timestamp field, the pause or watermark advancement is probably better done
> at file level.
> * source readers checkpoint the watermark. otherwise, upon restart readers
> won't be able to determine the local watermark and pause for alignment. We
> don't want to emit records upon restart due to unknown watermark info.
>
> All,
>
> Any opinion on different timestamp for source alignment (vs Flink
> application watermark)? For Iceberg source, we might want to enforce
> alignment on kafka timestamp but Flink application watermark may use event
> time field from payload.
>
> Thanks,
> Steven
>
> On Wed, May 4, 2022 at 7:02 AM Becket Qin <becket....@gmail.com> wrote:
>
>> Hey Piotr,
>>
>> I think the mechanism FLIP-182 provided is a reasonable default one, which
>> ensures the watermarks are only drifted by an upper bound. However,
>> admittedly there are also other strategies for different purposes.
>>
>> In the Iceberg case, I am not sure if a static strictly allowed watermark
>> drift is desired. The source might just want to finish reading the
>> assigned
>> splits as fast as possible. And it is OK to have a drift of "one split",
>> instead of a fixed time period.
>>
>> As another example, if there are some fast readers whose splits are always
>> throttled, while the other slow readers are struggling to keep up with the
>> rest of the splits, the split enumerator may decide to reassign the slow
>> splits so all the readers have something to read. This would need the
>> SplitEnumerator to be aware of the watermark progress on each reader. So
>> it
>> seems useful to expose the WatermarkAlignmentEvent information to the
>> SplitEnumerator as well.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>> > Hi Steven,
>> >
>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
>> > block the splits that are too much into the future? I can see this
>> being an
>> > issue if the existence of too many blocked splits is occupying too many
>> > resources.
>> >
>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would have
>> to
>> > decide on some basis how many and which splits to assign in what order.
>> But
>> > in that case I'm not sure how much you could use from FLIP-182 and
>> > FLIP-217. They seem somehow orthogonal to me, operating on different
>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
>> already
>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217
>> and
>> > take care of only the problem to limit the number of parallel active
>> > splits. And here I'm not sure if it would be worth generalising a
>> solution
>> > across different connectors.
>> >
>> > Regarding the global watermark, I made a related comment sometime ago
>> > about it [1]. It sounds to me like you also need to solve this problem,
>> > otherwise Iceberg users will encounter late records in case of some race
>> > conditions between assigning new splits and completions of older.
>> >
>> > Best,
>> > Piotrek
>> >
>> > [1]
>> >
>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>> >
>> > pon., 2 maj 2022 o 04:26 Steven Wu <stevenz...@gmail.com> napisał(a):
>> >
>> >> add dev@ group to the thread as Thomas suggested
>> >>
>> >> Arvid,
>> >>
>> >> The scenario 3 (Dynamic assignment + temporary no split) in the
>> FLIP-180
>> >> (idleness) can happen to Iceberg source alignment, as readers can be
>> >> temporarily starved due to the holdback by the enumerator when
>> assigning
>> >> new splits upon request.
>> >>
>> >> Totally agree that we should decouple this discussion with the
>> FLIP-217,
>> >> which addresses the split level watermark alignment problem as a
>> follow-up
>> >> of FLIP-182
>> >>
>> >> Becket,
>> >>
>> >> Yes, currently Iceberg source implemented the alignment leveraging the
>> >> dynamic split assignment from FLIP-27 design. Basically, the enumerator
>> >> can
>> >> hold back split assignments to readers when necessary. Everything are
>> >> centralized in the enumerator: (1) watermark extraction and aggregation
>> >> (2)
>> >> alignment decision and execution
>> >>
>> >> The motivation of this discussion is to see if Iceberg source can
>> leverage
>> >> some of the watermark alignment solutions (like FLIP-182) from Flink
>> >> framework. E.g., as mentioned in the doc, Iceberg source can
>> potentially
>> >> leverage the FLIP-182 framework to do the watermark extraction and
>> >> aggregation. For the alignment decision and execution, we can keep
>> them in
>> >> the centralized enumerator.
>> >>
>> >> Thanks,
>> >> Steven
>> >>
>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com>
>> wrote:
>> >>
>> >> > Hi Steven,
>> >> >
>> >> > Thanks for pulling me into this thread. I think the timestamp
>> >> > alignment use case here is a good example of what FLIP-27 was
>> designed
>> >> for.
>> >> >
>> >> > Technically speaking, Iceberg source can already implement the
>> timestamp
>> >> > alignment in the Flink new source even without FLIP-182. However, I
>> >> > understand the rationale here because timestamp alignment is also
>> >> trying to
>> >> > orchestrate the consumption of splits. However, it looks like
>> FLIP-182
>> >> was
>> >> > not designed in a way that it can be easily extended for other use
>> >> cases.
>> >> > It may probably worth thinking of a more general mechanism to answer
>> the
>> >> > following questions:
>> >> >
>> >> > 1. What information whose source of truth is the Flink framework
>> should
>> >> be
>> >> > exposed to the SplitEnumerator and SourceReader? And how?
>> >> > 2. What control actions in the Flink framework are worth exposing to
>> the
>> >> > SplitEnumerators and SourceReaders? And how?
>> >> >
>> >> > In the context of timestamp alignment, the first question is more
>> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent
>> >> > handling logic in the SourceCoordinator, should we expose this to the
>> >> > SplitEnumerator? So basically there will be some information, such as
>> >> > subtask local watermark, whose source of truth is Flink runtime, but
>> >> useful
>> >> > to the user provided pluggables.
>> >> >
>> >> > I think there are a few control flow patterns to make a complete
>> design:
>> >> >
>> >> > a. Framework space information (e.g. watermark) --> User space
>> >> Pluggables
>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>> >> split).
>> >> > b. Framework space information (e.g. task failure) --> User space
>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g.
>> exit
>> >> > the job)
>> >> > c. User space information (e.g. a custom workload metric) --> User
>> space
>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>> rebalance
>> >> > the workload across the source readers).
>> >> > d. Use space information (e.g. a custom stopping event in the stream)
>> >> -->
>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
>> actions
>> >> > (e.g. stop the job).
>> >> >
>> >> > So basically for any user provided pluggables, the input information
>> may
>> >> > either come from another user provided logic or from the framework,
>> and
>> >> > after receiving that information, the pluggable may either want the
>> >> > framework or another pluggable to take an action. So this gives the
>> >> above 4
>> >> > combinations.
>> >> >
>> >> > In our case, when the pluggables are SplitEnumerator and
>> SourceReader,
>> >> the
>> >> > control flows that only involve user space actions are fully
>> supported.
>> >> But
>> >> > it seems that when it comes to control flows involving framework
>> space
>> >> > information, some of the information has not been exposed to the
>> >> pluggable,
>> >> > and some framework actions might also be missing.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Jiangjie (Becket) Qin
>> >> >
>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>> wrote:
>> >> >
>> >> >> Hi folks,
>> >> >>
>> >> >> quick input from my side. I think this is from the implementation
>> >> >> perspective what Piotr and I had in mind for a global min watermark
>> >> that
>> >> >> helps in idleness cases. See also point 3 in
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> >> >> .
>> >> >>
>> >> >> Basically, we would like to empower source enumerators to determine
>> the
>> >> >> global min watermark for all source readers factoring in even future
>> >> >> splits. Not all sources can supply that information (think of a
>> general
>> >> >> file source) but most should be able to. Basically, Flink should
>> know
>> >> for a
>> >> >> given source at a given point in time what the min watermark across
>> all
>> >> >> source subtasks is.
>> >> >>
>> >> >> Here is some background:
>> >> >> In the context of idleness, we can deterministically advance the
>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>> >> sources
>> >> >> to switch to idleness and thus allow watermarks to increase in cases
>> >> where
>> >> >> fewer splits than source tasks are available. However, for sources
>> with
>> >> >> dynamic split discovery that actually yields incorrect results.
>> Think
>> >> of a
>> >> >> Kinesis consumer where a shard is split. Then a previously idle
>> source
>> >> >> subtask may receive a new split with time t0 as the lowest
>> timestamp.
>> >> Since
>> >> >> the source subtask did not participate in the global watermark
>> >> generation
>> >> >> (because it was idle), the previously emitted watermark may be past
>> t0
>> >> and
>> >> >> thus results in late records potentially being discarded. A rerun of
>> >> the
>> >> >> same pipeline on historic data would not render the source subtask
>> >> idle and
>> >> >> not result in late records. The solution was to not render source
>> >> subtasks
>> >> >> automatically idle by the framework if there are no spits. That
>> leads
>> >> to
>> >> >> confusion for Kafka users with static topic subscription where
>> #splits
>> >> <
>> >> >> #parallelism stalls pipelines because the watermark is not
>> advancing.
>> >> Here,
>> >> >> your sketched solution can be transferred to KafkaSource to let
>> Flink
>> >> know
>> >> >> that min global watermark on a static assignment is determined by
>> the
>> >> >> slowest partition. Hence, all idle readers emit that min global
>> >> watermark
>> >> >> and the user sees progress.
>> >> >> This whole idea is related to FLIP-182 watermark alignment but I'd
>> go
>> >> >> with another FLIP as the goal is quite different even though the
>> >> >> implementation overlaps.
>> >> >>
>> >> >> Now Iceberg seems to use the same information to actually pause the
>> >> >> consumption of files and create some kind of orderness guarantees as
>> >> far as
>> >> >> I understood. This probably can be applied to any source with
>> dynamic
>> >> split
>> >> >> discovery. However, I wouldn't mix up the concepts and hence I
>> >> appreciate
>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
>> >> FLIP-182 is
>> >> >> to pause readers while consuming a split, while your approach pauses
>> >> >> readers before processing another split. So it feels more closely
>> >> related
>> >> >> to the global min watermark - so it could either be part of that
>> FLIP
>> >> or a
>> >> >> FLIP of its own. Afaik API changes should actually happen only on
>> the
>> >> >> enumerator side both for your ideas and for global min watermark.
>> >> >>
>> >> >> Best,
>> >> >>
>> >> >> Arvid
>> >> >>
>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org>
>> wrote:
>> >> >>
>> >> >>> Hi Steven,
>> >> >>>
>> >> >>> Would it be better to bring this as a separate thread related to
>> >> Iceberg
>> >> >>> source to the dev@ list? I think this could benefit from broader
>> >> input?
>> >> >>>
>> >> >>> Thanks
>> >> >>>
>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com>
>> >> wrote:
>> >> >>>
>> >> >>>> + Becket and Sebastian
>> >> >>>>
>> >> >>>> It is also related to the split level watermark alignment
>> discussion
>> >> >>>> thread. Because it is already very long, I don't want to further
>> >> complicate
>> >> >>>> the ongoing discussion there. But I can move the discussion to
>> that
>> >> >>>> existing thread if that is preferred.
>> >> >>>>
>> >> >>>>
>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <stevenz...@gmail.com>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Hi all,
>> >> >>>>>
>> >> >>>>> We are thinking about how to align with the Flink community and
>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg source.
>> I
>> >> put some
>> >> >>>>> context in this google doc. Would love to get hear your thoughts
>> on
>> >> this.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >>
>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>> >> >>>>>
>> >> >>>>> Thanks,
>> >> >>>>> Steven
>> >> >>>>>
>> >> >>>>
>> >>
>> >
>>
>

Reply via email to