Hi Steven,

Ok, thanks for the clarification. I'm not sure how much could be leveraged?
Maybe just re-using the watermark alignment configuration? Please correct
me if I'm wrong, but I think for the sole purpose of this use case, I don't
see a good motivation behind expanding our APIs. Clearly this feature can
be implemented already (you did it in the Iceberg connector after all).

> As another example, if there are some fast readers whose splits are
always throttled, while the other slow readers
> are struggling to keep up with the rest of the splits, the split
enumerator may decide to reassign the slow
> splits so all the readers have something to read. This would need the
SplitEnumerator to be aware of the
> watermark progress on each reader. So it seems useful to expose the
WatermarkAlignmentEvent information to the
> SplitEnumerator as well.

It seems like a valid potential use case. But do we have a good enough
motivation to work on it right now?

Piotrek

czw., 5 maj 2022 o 16:21 Steven Wu <stevenz...@gmail.com> napisał(a):

> Piotr,
>
> With FLIP-27, Iceberg source already implemented alignment by tracking
> watermark and holding back split assignment when necessary.
>
> The purpose of this discussion is to see if Iceberg source can leverage
> some of the watermark alignment work from Flink framework.
>
> Thanks,
> Steven
>
> On Thu, May 5, 2022 at 1:10 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Ok, I see. Thanks to both of you for the explanation.
> >
> > Do we need changes to Apache Flink for this feature? Can it be
> implemented
> > in the Sources without changes in the framework? I presume source can
> > access min/max watermark from the split, so as long as it also knows
> > exactly which splits have finished, it would know which splits to hold
> back.
> >
> > Best,
> > Piotrek
> >
> > śr., 4 maj 2022 o 20:03 Steven Wu <stevenz...@gmail.com> napisał(a):
> >
> >> Piotr, thanks a lot for your feedback.
> >>
> >> > I can see this being an issue if the existence of too many blocked
> >> splits is occupying too many resources.
> >>
> >> This is not desirable. Eagerly assigning many splits to a reader can
> >> defeat the benefits of pull based dynamic split assignments. Iceberg
> >> readers request one split at a time upon start or completion of a split.
> >> Dynamic split assignment is better for work sharing/stealing as Becket
> >> mentioned. Limiting number of active splits can be handled by the
> FLIP-27
> >> Iceberg source and is somewhat orthogonal to watermark alignment.
> >>
> >> > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle
> >> the watermark alignment and block the splits that are too much into the
> >> future?
> >>
> >> The enumerator just assigns the next split to the requesting reader
> >> instead of holding back the split assignment. Let the reader handle the
> >> pause (if the file split requires alignment wait).  This strategy might
> >> work and leverage more from the framework.
> >>
> >> We probably need the following to make this work
> >> * extract watermark/timestamp only at the completion of a split (not at
> >> record level). Because records in a file aren't probably not sorted by
> the
> >> timestamp field, the pause or watermark advancement is probably better
> done
> >> at file level.
> >> * source readers checkpoint the watermark. otherwise, upon restart
> >> readers won't be able to determine the local watermark and pause for
> >> alignment. We don't want to emit records upon restart due to unknown
> >> watermark info.
> >>
> >> All,
> >>
> >> Any opinion on different timestamp for source alignment (vs Flink
> >> application watermark)? For Iceberg source, we might want to enforce
> >> alignment on kafka timestamp but Flink application watermark may use
> event
> >> time field from payload.
> >>
> >> Thanks,
> >> Steven
> >>
> >> On Wed, May 4, 2022 at 7:02 AM Becket Qin <becket....@gmail.com> wrote:
> >>
> >>> Hey Piotr,
> >>>
> >>> I think the mechanism FLIP-182 provided is a reasonable default one,
> >>> which
> >>> ensures the watermarks are only drifted by an upper bound. However,
> >>> admittedly there are also other strategies for different purposes.
> >>>
> >>> In the Iceberg case, I am not sure if a static strictly allowed
> watermark
> >>> drift is desired. The source might just want to finish reading the
> >>> assigned
> >>> splits as fast as possible. And it is OK to have a drift of "one
> split",
> >>> instead of a fixed time period.
> >>>
> >>> As another example, if there are some fast readers whose splits are
> >>> always
> >>> throttled, while the other slow readers are struggling to keep up with
> >>> the
> >>> rest of the splits, the split enumerator may decide to reassign the
> slow
> >>> splits so all the readers have something to read. This would need the
> >>> SplitEnumerator to be aware of the watermark progress on each reader.
> So
> >>> it
> >>> seems useful to expose the WatermarkAlignmentEvent information to the
> >>> SplitEnumerator as well.
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>>
> >>>
> >>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pnowoj...@apache.org>
> >>> wrote:
> >>>
> >>> > Hi Steven,
> >>> >
> >>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
> >>> emit
> >>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment
> and
> >>> > block the splits that are too much into the future? I can see this
> >>> being an
> >>> > issue if the existence of too many blocked splits is occupying too
> many
> >>> > resources.
> >>> >
> >>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would
> >>> have to
> >>> > decide on some basis how many and which splits to assign in what
> >>> order. But
> >>> > in that case I'm not sure how much you could use from FLIP-182 and
> >>> > FLIP-217. They seem somehow orthogonal to me, operating on different
> >>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
> >>> already
> >>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217
> >>> and
> >>> > take care of only the problem to limit the number of parallel active
> >>> > splits. And here I'm not sure if it would be worth generalising a
> >>> solution
> >>> > across different connectors.
> >>> >
> >>> > Regarding the global watermark, I made a related comment sometime ago
> >>> > about it [1]. It sounds to me like you also need to solve this
> problem,
> >>> > otherwise Iceberg users will encounter late records in case of some
> >>> race
> >>> > conditions between assigning new splits and completions of older.
> >>> >
> >>> > Best,
> >>> > Piotrek
> >>> >
> >>> > [1]
> >>> >
> >>>
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
> >>> >
> >>> > pon., 2 maj 2022 o 04:26 Steven Wu <stevenz...@gmail.com>
> napisał(a):
> >>> >
> >>> >> add dev@ group to the thread as Thomas suggested
> >>> >>
> >>> >> Arvid,
> >>> >>
> >>> >> The scenario 3 (Dynamic assignment + temporary no split) in the
> >>> FLIP-180
> >>> >> (idleness) can happen to Iceberg source alignment, as readers can be
> >>> >> temporarily starved due to the holdback by the enumerator when
> >>> assigning
> >>> >> new splits upon request.
> >>> >>
> >>> >> Totally agree that we should decouple this discussion with the
> >>> FLIP-217,
> >>> >> which addresses the split level watermark alignment problem as a
> >>> follow-up
> >>> >> of FLIP-182
> >>> >>
> >>> >> Becket,
> >>> >>
> >>> >> Yes, currently Iceberg source implemented the alignment leveraging
> the
> >>> >> dynamic split assignment from FLIP-27 design. Basically, the
> >>> enumerator
> >>> >> can
> >>> >> hold back split assignments to readers when necessary. Everything
> are
> >>> >> centralized in the enumerator: (1) watermark extraction and
> >>> aggregation
> >>> >> (2)
> >>> >> alignment decision and execution
> >>> >>
> >>> >> The motivation of this discussion is to see if Iceberg source can
> >>> leverage
> >>> >> some of the watermark alignment solutions (like FLIP-182) from Flink
> >>> >> framework. E.g., as mentioned in the doc, Iceberg source can
> >>> potentially
> >>> >> leverage the FLIP-182 framework to do the watermark extraction and
> >>> >> aggregation. For the alignment decision and execution, we can keep
> >>> them in
> >>> >> the centralized enumerator.
> >>> >>
> >>> >> Thanks,
> >>> >> Steven
> >>> >>
> >>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>> >>
> >>> >> > Hi Steven,
> >>> >> >
> >>> >> > Thanks for pulling me into this thread. I think the timestamp
> >>> >> > alignment use case here is a good example of what FLIP-27 was
> >>> designed
> >>> >> for.
> >>> >> >
> >>> >> > Technically speaking, Iceberg source can already implement the
> >>> timestamp
> >>> >> > alignment in the Flink new source even without FLIP-182. However,
> I
> >>> >> > understand the rationale here because timestamp alignment is also
> >>> >> trying to
> >>> >> > orchestrate the consumption of splits. However, it looks like
> >>> FLIP-182
> >>> >> was
> >>> >> > not designed in a way that it can be easily extended for other use
> >>> >> cases.
> >>> >> > It may probably worth thinking of a more general mechanism to
> >>> answer the
> >>> >> > following questions:
> >>> >> >
> >>> >> > 1. What information whose source of truth is the Flink framework
> >>> should
> >>> >> be
> >>> >> > exposed to the SplitEnumerator and SourceReader? And how?
> >>> >> > 2. What control actions in the Flink framework are worth exposing
> >>> to the
> >>> >> > SplitEnumerators and SourceReaders? And how?
> >>> >> >
> >>> >> > In the context of timestamp alignment, the first question is more
> >>> >> > relevant. For example, instead of hardcode the
> ReportWatermarkEvent
> >>> >> > handling logic in the SourceCoordinator, should we expose this to
> >>> the
> >>> >> > SplitEnumerator? So basically there will be some information, such
> >>> as
> >>> >> > subtask local watermark, whose source of truth is Flink runtime,
> but
> >>> >> useful
> >>> >> > to the user provided pluggables.
> >>> >> >
> >>> >> > I think there are a few control flow patterns to make a complete
> >>> design:
> >>> >> >
> >>> >> > a. Framework space information (e.g. watermark) --> User space
> >>> >> Pluggables
> >>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading
> a
> >>> >> split).
> >>> >> > b. Framework space information (e.g. task failure) --> User space
> >>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions
> (e.g.
> >>> exit
> >>> >> > the job)
> >>> >> > c. User space information (e.g. a custom workload metric) --> User
> >>> space
> >>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
> >>> rebalance
> >>> >> > the workload across the source readers).
> >>> >> > d. Use space information (e.g. a custom stopping event in the
> >>> stream)
> >>> >> -->
> >>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
> >>> actions
> >>> >> > (e.g. stop the job).
> >>> >> >
> >>> >> > So basically for any user provided pluggables, the input
> >>> information may
> >>> >> > either come from another user provided logic or from the
> framework,
> >>> and
> >>> >> > after receiving that information, the pluggable may either want
> the
> >>> >> > framework or another pluggable to take an action. So this gives
> the
> >>> >> above 4
> >>> >> > combinations.
> >>> >> >
> >>> >> > In our case, when the pluggables are SplitEnumerator and
> >>> SourceReader,
> >>> >> the
> >>> >> > control flows that only involve user space actions are fully
> >>> supported.
> >>> >> But
> >>> >> > it seems that when it comes to control flows involving framework
> >>> space
> >>> >> > information, some of the information has not been exposed to the
> >>> >> pluggable,
> >>> >> > and some framework actions might also be missing.
> >>> >> >
> >>> >> > Thanks,
> >>> >> >
> >>> >> > Jiangjie (Becket) Qin
> >>> >> >
> >>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
> >>> wrote:
> >>> >> >
> >>> >> >> Hi folks,
> >>> >> >>
> >>> >> >> quick input from my side. I think this is from the implementation
> >>> >> >> perspective what Piotr and I had in mind for a global min
> watermark
> >>> >> that
> >>> >> >> helps in idleness cases. See also point 3 in
> >>> >> >>
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >>> >> >> .
> >>> >> >>
> >>> >> >> Basically, we would like to empower source enumerators to
> >>> determine the
> >>> >> >> global min watermark for all source readers factoring in even
> >>> future
> >>> >> >> splits. Not all sources can supply that information (think of a
> >>> general
> >>> >> >> file source) but most should be able to. Basically, Flink should
> >>> know
> >>> >> for a
> >>> >> >> given source at a given point in time what the min watermark
> >>> across all
> >>> >> >> source subtasks is.
> >>> >> >>
> >>> >> >> Here is some background:
> >>> >> >> In the context of idleness, we can deterministically advance the
> >>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
> >>> >> sources
> >>> >> >> to switch to idleness and thus allow watermarks to increase in
> >>> cases
> >>> >> where
> >>> >> >> fewer splits than source tasks are available. However, for
> sources
> >>> with
> >>> >> >> dynamic split discovery that actually yields incorrect results.
> >>> Think
> >>> >> of a
> >>> >> >> Kinesis consumer where a shard is split. Then a previously idle
> >>> source
> >>> >> >> subtask may receive a new split with time t0 as the lowest
> >>> timestamp.
> >>> >> Since
> >>> >> >> the source subtask did not participate in the global watermark
> >>> >> generation
> >>> >> >> (because it was idle), the previously emitted watermark may be
> >>> past t0
> >>> >> and
> >>> >> >> thus results in late records potentially being discarded. A rerun
> >>> of
> >>> >> the
> >>> >> >> same pipeline on historic data would not render the source
> subtask
> >>> >> idle and
> >>> >> >> not result in late records. The solution was to not render source
> >>> >> subtasks
> >>> >> >> automatically idle by the framework if there are no spits. That
> >>> leads
> >>> >> to
> >>> >> >> confusion for Kafka users with static topic subscription where
> >>> #splits
> >>> >> <
> >>> >> >> #parallelism stalls pipelines because the watermark is not
> >>> advancing.
> >>> >> Here,
> >>> >> >> your sketched solution can be transferred to KafkaSource to let
> >>> Flink
> >>> >> know
> >>> >> >> that min global watermark on a static assignment is determined by
> >>> the
> >>> >> >> slowest partition. Hence, all idle readers emit that min global
> >>> >> watermark
> >>> >> >> and the user sees progress.
> >>> >> >> This whole idea is related to FLIP-182 watermark alignment but
> I'd
> >>> go
> >>> >> >> with another FLIP as the goal is quite different even though the
> >>> >> >> implementation overlaps.
> >>> >> >>
> >>> >> >> Now Iceberg seems to use the same information to actually pause
> the
> >>> >> >> consumption of files and create some kind of orderness guarantees
> >>> as
> >>> >> far as
> >>> >> >> I understood. This probably can be applied to any source with
> >>> dynamic
> >>> >> split
> >>> >> >> discovery. However, I wouldn't mix up the concepts and hence I
> >>> >> appreciate
> >>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
> >>> >> FLIP-182 is
> >>> >> >> to pause readers while consuming a split, while your approach
> >>> pauses
> >>> >> >> readers before processing another split. So it feels more closely
> >>> >> related
> >>> >> >> to the global min watermark - so it could either be part of that
> >>> FLIP
> >>> >> or a
> >>> >> >> FLIP of its own. Afaik API changes should actually happen only on
> >>> the
> >>> >> >> enumerator side both for your ideas and for global min watermark.
> >>> >> >>
> >>> >> >> Best,
> >>> >> >>
> >>> >> >> Arvid
> >>> >> >>
> >>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org>
> >>> wrote:
> >>> >> >>
> >>> >> >>> Hi Steven,
> >>> >> >>>
> >>> >> >>> Would it be better to bring this as a separate thread related to
> >>> >> Iceberg
> >>> >> >>> source to the dev@ list? I think this could benefit from
> broader
> >>> >> input?
> >>> >> >>>
> >>> >> >>> Thanks
> >>> >> >>>
> >>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com
> >
> >>> >> wrote:
> >>> >> >>>
> >>> >> >>>> + Becket and Sebastian
> >>> >> >>>>
> >>> >> >>>> It is also related to the split level watermark alignment
> >>> discussion
> >>> >> >>>> thread. Because it is already very long, I don't want to
> further
> >>> >> complicate
> >>> >> >>>> the ongoing discussion there. But I can move the discussion to
> >>> that
> >>> >> >>>> existing thread if that is preferred.
> >>> >> >>>>
> >>> >> >>>>
> >>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
> stevenz...@gmail.com
> >>> >
> >>> >> >>>> wrote:
> >>> >> >>>>
> >>> >> >>>>> Hi all,
> >>> >> >>>>>
> >>> >> >>>>> We are thinking about how to align with the Flink community
> and
> >>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg
> >>> source. I
> >>> >> put some
> >>> >> >>>>> context in this google doc. Would love to get hear your
> >>> thoughts on
> >>> >> this.
> >>> >> >>>>>
> >>> >> >>>>>
> >>> >> >>>>>
> >>> >>
> >>>
> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
> >>> >> >>>>>
> >>> >> >>>>> Thanks,
> >>> >> >>>>> Steven
> >>> >> >>>>>
> >>> >> >>>>
> >>> >>
> >>> >
> >>>
> >>
>

Reply via email to