Hi everyone,

I do agree that Flink's definition of idleness is not fully thought through
yet. Consequently, I would feel a bit uneasy to make it part of Flink's API
right now. Instead, defining the proper semantics first and then exposing
it sounds like a good approach forward. Hence, +1 for option number 1,
which will also allow FLIP-167 to make progress.

Concerning subtasks with no partitions assigned, would it make sense to
terminate these tasks at some point? That way, the stream would be closed
and there is no need to maintain a stream status. Of course, this also
requires at some point that Flink can start new sources when new partitions
appear.

Cheers,
Till

On Tue, Jun 8, 2021 at 9:26 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi Eron,
>
> The FLIP-167 is narrow, but we recently discovered some problems with
> current idleness semantics as Arvid explained. We are planning to present a
> new proposal to redefine them. Probably as a part of it, we would need to
> rename them. Given that, I think it doesn't make sense to expose idleness
> to the sinks before we rename and define it properly. In other words:
>
> > 2. When the sink operator is idled, tell the sink function.
>
> We shouldn't expose stream status as a part of public API until it's
> properly defined.
>
> I would propose one of the two things:
> 1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
> Exposing idleness could be part of this next/future FLIP that would define
> idleness in the first place.
> 2. Block FLIP-167, until the idleness is fixed.
>
> I would vote for option number 1.
>
> Piotrek
>
> pon., 7 cze 2021 o 18:08 Eron Wright <ewri...@streamnative.io.invalid>
> napisał(a):
>
> > Piotr, David, and Arvid, we've had an expansive discussion but ultimately
> > the proposal is narrow.  It is:
> > 1. When a watermark arrives at the sink operator, tell the sink function.
> > 2. When the sink operator is idled, tell the sink function.
> >
> > With these enhancements, we will significantly improve correctness in
> > multi-stage flows, and facilitate an exciting project in the Pulsar
> > community.  Would you please lend your support to FLIP-167 so that we can
> > land this enhancement for 1.14?  My deepest thanks!
> >
> > -Eron
> >
> >
> >
> >
> > On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Hi Eron,
> > >
> > > you either have very specific use cases in mind or have a misconception
> > > about idleness in Flink with the new sources. The basic idea is that
> you
> > > have watermark generators only at the sources and the user supplies
> them.
> > > As a source author, you have no option to limit that. Here a bit of
> > > background:
> > >
> > > We observed that many users that read from Kafka were confused about no
> > > visible progress in their Flink applications because of some idle
> > partition
> > > and we introduced idleness subsequently. Idleness was always considered
> > as
> > > a means to achieve progress at the risk of losing a bit of correctness.
> > > So especially in the case that you describe with a Pulsar partition
> that
> > is
> > > empty but indefinitely active, the user needs to be able to use
> idleness
> > > such that downstream window operators progress.
> > >
> > > I hope to have clarified that "I wouldn't recommend using
> withIdleness()
> > > with source-based watermarks." would pretty much make the intended use
> > case
> > > not work anymore.
> > >
> > > ---
> > >
> > > Nevertheless, from the discussion with you and some offline discussion
> > with
> > > Piotr and Dawid, we actually found quite a bit of drawbacks from the
> > > current definition of idleness:
> > > - We currently only use idleness to exclude respective upstream tasks
> > from
> > > participating in watermark generation (as you have eloquently put
> further
> > > up in the thread).
> > > - However, the definition is bound to records. So while a partition is
> > > idle, no records should be produced.
> > > - That brings us into quite a few edge cases, where operators emit
> > records,
> > > while they are actually idling: Think of timers, asyncIO operators,
> > window
> > > operators based on timeouts, etc.
> > > - The solution would be to turn the operator active while emitting and
> > > returning to being idle afterwards (but when?). However, this has some
> > > unintended side-effects depending on when you switch back.
> > >
> > > We are currently thinking that we should rephrase the definition to
> what
> > > you described:
> > > - A channel that is active is providing watermarks.
> > > - An idle channel is not providing any watermarks but can deliver
> > records.
> > > - Then we are not talking about idle partitions anymore but explicit
> and
> > > implicit watermark generation and should probably rename the concepts.
> > > - This would probably mean that we also need an explicit markActive in
> > > source/sink to express that the respective entity now needs to wait for
> > > explicit watermarks.
> > >
> > > I'll open a proper discussion thread tomorrow.
> > >
> > > Note that we probably shouldn't rush this FLIP until we have clarified
> > the
> > > semantics of idleness. We could also cut the scope of the FLIP to
> exclude
> > > idleness and go ahead without it (there should be enough binding votes
> > > already).
> > >
> > > On Sat, Jun 5, 2021 at 12:09 AM Eron Wright <ewri...@streamnative.io
> > > .invalid>
> > > wrote:
> > >
> > > > I understand your scenario but I disagree with its assumptions:
> > > >
> > > > "However, the partition of A is empty and thus A is temporarily
> idle."
> > -
> > > > you're assuming that the behavior of the source is to mark itself
> idle
> > if
> > > > data isn't available, but that's clearly source-specific and not
> > behavior
> > > > we expect to have in Pulsar source.  A partition may be empty
> > > indefinitely
> > > > while still being active.  Imagine that the producer is defending a
> > > lease -
> > > > "I'm here, there's no data, please don't advance the clock".
> > > >
> > > > "we bind idleness to wall clock time" - you're characterizing a
> > specific
> > > > strategy (WatermarkStrategy.withIdleness()), not the inherent
> behavior
> > of
> > > > the pipeline.  I wouldn't recommend using withIdleness() with
> > > source-based
> > > > watermarks.
> > > >
> > > > I do agree that dynamism in partition assignment can wreak havoc on
> > > > watermark correctness.  We have some ideas on the Pulsar side about
> > that
> > > > too.  I would ask that we focus on the Flink framework and pipeline
> > > > behavior.  By offering a more powerful framework, we encourage stream
> > > > storage systems to "rise to the occasion" - treat event time in a
> > > > first-class way, optimize for correctness, etc.  In this case,
> FLIP-167
> > > is
> > > > setting the stage for evolution in Pulsar.
> > > >
> > > > Thanks again Arvid for the great discussion.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise <ar...@apache.org> wrote:
> > > >
> > > > > At least one big motivation is having (temporary) empty partitions.
> > Let
> > > > me
> > > > > give you an example, why imho idleness is only approximate in this
> > > case:
> > > > > Assume you have source subtask A, B, C that correspond to 3 source
> > > > > partitions and a downstream keyed window operator W.
> > > > >
> > > > > W would usually trigger on min_watermark(A, B, C). However, the
> > > partition
> > > > > of A is empty and thus A is temporarily idle. So W triggers on
> > > > > min_watermark(B, C). When A is now active again, the watermark
> > > implicitly
> > > > > is min_watermark(B, C) for A!
> > > > >
> > > > > Let's further assume that the source is filled by another pipeline
> > > > before.
> > > > > This pipeline experiences technical difficulties for X minutes and
> > > could
> > > > > not produce into the partition of A, hence the idleness. When the
> > > > upstream
> > > > > pipeline resumes it fills A with some records that are before
> > > > > min_watermark(B, C). Any watermark generated from these records is
> > > > > discarded as the watermark is monotonous. Therefore, these records
> > will
> > > > be
> > > > > considered late by W and discarded.
> > > > >
> > > > > Without idleness, we would have simply bocked W until the upstream
> > > > pipeline
> > > > > fully recovers and we would not have had any late records. The same
> > > holds
> > > > > for any reprocessing where the data of partition A is continuous.
> > > > >
> > > > > If you look deeper, the issue is that we bind idleness to wall
> clock
> > > time
> > > > > (e.g. advance watermark after X seconds without data). Then we
> assume
> > > the
> > > > > watermark of the idle partition to be in sync with the slowest
> > > partition.
> > > > > However, in the case of hiccups, this assumption does not hold at
> > all.
> > > > > I don't see any fix for that (easy or not easy) and imho it's
> > inherent
> > > to
> > > > > the design of idleness.
> > > > > We lack information (why is no data coming) and have a heuristic to
> > fix
> > > > it.
> > > > >
> > > > > In the case of partition assignment where one subtask has no
> > partition,
> > > > we
> > > > > are probably somewhat safe. We know why no data is coming (no
> > > partition)
> > > > > and as long as we do not have dynamic partition assignment, there
> > will
> > > > > never be a switch to active without restart (for the foreseeable
> > > future).
> > > > >
> > > > > On Fri, Jun 4, 2021 at 10:34 PM Eron Wright <
> ewri...@streamnative.io
> > > > > .invalid>
> > > > > wrote:
> > > > >
> > > > > > Yes I'm talking about an implementation of idleness that is
> > unrelated
> > > > to
> > > > > > processing time.  The clear example is partition assignment to
> > > > subtasks,
> > > > > > which probably motivated Flink's idleness functionality in the
> > first
> > > > > place.
> > > > > >
> > > > > > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise <ar...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Hi Eron,
> > > > > > >
> > > > > > > Are you referring to an implementation of idleness that does
> not
> > > rely
> > > > > on
> > > > > > a
> > > > > > > wall clock but on some clock baked into the partition
> information
> > > of
> > > > > the
> > > > > > > source system?
> > > > > > > If so, you are right that it invalidates my points.
> > > > > > > Do you have an example on where this is used?
> > > > > > >
> > > > > > > With a wall clock, you always run into the issues that I
> describe
> > > > since
> > > > > > you
> > > > > > > are effectively mixing event time and processing time...
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright <
> > > ewri...@streamnative.io
> > > > > > > .invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Dawid, I think you're mischaracterizing the idleness signal
> as
> > > > > > > inherently a
> > > > > > > > heuristic, but Flink does not impose that.  A source-based
> > > > watermark
> > > > > > (and
> > > > > > > > corresponding idleness signal) may well be entirely
> > data-driven,
> > > > > > entirely
> > > > > > > > deterministic.  Basically you're underselling what the
> pipeline
> > > is
> > > > > > > capable
> > > > > > > > of, based on painful experiences with using the generic,
> > > > > > heuristics-based
> > > > > > > > watermark assigner.  Please don't let those experiences
> > > overshadow
> > > > > > what's
> > > > > > > > possible with source-based watermarking.
> > > > > > > >
> > > > > > > > The idleness signal does have a strict definition, it
> indicates
> > > > > whether
> > > > > > > the
> > > > > > > > stream is actively participating in advancing the event time
> > > clock.
> > > > > > The
> > > > > > > > status of all participants is considered when aggregating
> > > > watermarks.
> > > > > > A
> > > > > > > > source subtask generally makes the determination based on
> data,
> > > > e.g.
> > > > > > > > whether a topic is assigned to that subtask.
> > > > > > > >
> > > > > > > > We have here a modest proposal to add callbacks to the sink
> > > > function
> > > > > > for
> > > > > > > > information that the sink operator already receives.  The
> > > practical
> > > > > > > result
> > > > > > > > is improved correctness when used with streaming systems that
> > > have
> > > > > > > > first-class support for event time.  The specific changes may
> > be
> > > > > > > previewed
> > > > > > > > here:
> > > > > > > > https://github.com/apache/flink/pull/15950
> > > > > > > > https://github.com/streamnative/flink/pull/2
> > > > > > > >
> > > > > > > > Thank you all for the robust discussion. Do I have your
> support
> > > to
> > > > > > > proceed
> > > > > > > > to enhance FLIP-167 with idleness callbacks and to proceed
> to a
> > > > vote?
> > > > > > > >
> > > > > > > > Eron
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise <ar...@apache.org
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > While everything I wrote before is still valid, upon
> further
> > > > > > > rethinking,
> > > > > > > > I
> > > > > > > > > think that the conclusion is not necessarily correct:
> > > > > > > > > - If the user wants to have pipeline A and B behaving as if
> > A+B
> > > > was
> > > > > > > > jointly
> > > > > > > > > executed in the same pipeline without the intermediate
> Pulsar
> > > > > topic,
> > > > > > > > having
> > > > > > > > > the idleness in that topic is to only way to guarantee
> > > > consistency.
> > > > > > > > > - We could support the following in the respective sources:
> > If
> > > > the
> > > > > > user
> > > > > > > > > that wants to use a different definition of idleness in B,
> > they
> > > > can
> > > > > > > just
> > > > > > > > > provide a new idleness definition. At that point, we should
> > > > discard
> > > > > > the
> > > > > > > > > idleness in the intermediate topic while reading.
> > > > > > > > >
> > > > > > > > > If we would agree on the latter way, I think having the
> > > idleness
> > > > in
> > > > > > the
> > > > > > > > > topic is of great use because it's a piece of information
> > that
> > > > > cannot
> > > > > > > be
> > > > > > > > > inferred as stated by others. Consequently, we would be
> able
> > to
> > > > > > support
> > > > > > > > all
> > > > > > > > > use cases and can give the user the freedom to express his
> > > > intent.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise <
> ar...@apache.org
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I think the core issue in this discussion is that we kind
> > of
> > > > > assume
> > > > > > > > that
> > > > > > > > > > idleness is something universally well-defined. But it's
> > not.
> > > > > It's
> > > > > > a
> > > > > > > > > > heuristic to advance data processing in event time where
> we
> > > > would
> > > > > > > lack
> > > > > > > > > data
> > > > > > > > > > to do so otherwise.
> > > > > > > > > > Keep in mind that idleness has no real definition in
> terms
> > of
> > > > > event
> > > > > > > > time
> > > > > > > > > > and leads to severe unexpected results: If you reprocess
> a
> > > data
> > > > > > > stream
> > > > > > > > > with
> > > > > > > > > > temporarily idle partitions, these partitions would not
> be
> > > > deemed
> > > > > > > idle
> > > > > > > > on
> > > > > > > > > > reprocessing and there is a realistic chance that records
> > > that
> > > > > were
> > > > > > > > > deemed
> > > > > > > > > > late in the live processing case are now perfectly fine
> > > records
> > > > > in
> > > > > > > the
> > > > > > > > > > reprocessing case. (I can expand on that if that was too
> > > short)
> > > > > > > > > >
> > > > > > > > > > With that in mind, why would a downstream process even
> try
> > to
> > > > > > > calculate
> > > > > > > > > > the same idleness state as the upstream process? I don't
> > see
> > > a
> > > > > > point;
> > > > > > > > we
> > > > > > > > > > would just further any imprecision in the calculation.
> > > > > > > > > >
> > > > > > > > > > Let's have a concrete example. Assume that we have
> upstream
> > > > > > pipeline
> > > > > > > A
> > > > > > > > > and
> > > > > > > > > > downstream pipeline B. A has plenty of resources and is
> > live
> > > > > > > processing
> > > > > > > > > > data. Some partitions are idle and that is propagated to
> > the
> > > > > sinks.
> > > > > > > > Now B
> > > > > > > > > > is heavily backpressured and consumes very slowly. B
> > doesn't
> > > > see
> > > > > > any
> > > > > > > > > > idleness directly. B can calculate exact watermarks and
> use
> > > all
> > > > > > > records
> > > > > > > > > for
> > > > > > > > > > it's calculation. Reprocessing would yield the same
> result
> > > for
> > > > B.
> > > > > > If
> > > > > > > we
> > > > > > > > > now
> > > > > > > > > > forward idleness, we can easily find cases where we would
> > > > advance
> > > > > > the
> > > > > > > > > > watermark prematurely while there is data directly
> > available
> > > to
> > > > > > > > calculate
> > > > > > > > > > the exact watermark.
> > > > > > > > > >
> > > > > > > > > > For me, idleness is just a pipeline-specific heuristic
> and
> > > > should
> > > > > > be
> > > > > > > > > > viewed as such.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > >
> > > > > > > > > > Arvid
> > > > > > > > > >
> > > > > > > > > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski <
> > > > > > pnowoj...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi,
> > > > > > > > > >>
> > > > > > > > > >> > Imagine you're starting consuming from the result
> > channel
> > > > in a
> > > > > > > > > situation
> > > > > > > > > >> were you have:
> > > > > > > > > >> > record4, record3, StreamStatus.ACTIVE,
> StreamStatus.IDLE
> > > > > > record2,
> > > > > > > > > >> record1, record0
> > > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is
> > unnecessary,
> > > > and
> > > > > > > might
> > > > > > > > > >> cause the record3 and record4 to be late depending on
> how
> > > the
> > > > > > > > watermark
> > > > > > > > > >> progressed in other partitions.
> > > > > > > > > >>
> > > > > > > > > >> Yes, I understand this point. But it can also be the
> other
> > > way
> > > > > > > around.
> > > > > > > > > >> There might be a large gap between record2 and record3,
> > and
> > > > > users
> > > > > > > > might
> > > > > > > > > >> prefer or might be not able to duplicate idleness
> > detection
> > > > > logic.
> > > > > > > The
> > > > > > > > > >> downstream system might be lacking some kind of
> > information
> > > > > (that
> > > > > > is
> > > > > > > > > only
> > > > > > > > > >> available in the top level/ingesting system) to
> correctly
> > > set
> > > > > the
> > > > > > > idle
> > > > > > > > > >> status.
> > > > > > > > > >>
> > > > > > > > > >> Piotrek
> > > > > > > > > >>
> > > > > > > > > >> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz <
> > > > > dwysakow...@apache.org>
> > > > > > > > > >> napisał(a):
> > > > > > > > > >>
> > > > > > > > > >> >
> > > > > > > > > >> > Same as Eron I don't follow this point. Any streaming
> > sink
> > > > can
> > > > > > be
> > > > > > > > used
> > > > > > > > > >> as
> > > > > > > > > >> > this kind of transient channel. Streaming sinks, like
> > > Kafka,
> > > > > are
> > > > > > > > also
> > > > > > > > > >> used
> > > > > > > > > >> > to connect one streaming system with another one, also
> > for
> > > > an
> > > > > > > > > immediate
> > > > > > > > > >> > consumption.
> > > > > > > > > >> >
> > > > > > > > > >> > Sure it can, but imo it is rarely the primary use case
> > why
> > > > you
> > > > > > > want
> > > > > > > > to
> > > > > > > > > >> > offload the channels to an external persistent system.
> > > Again
> > > > > in
> > > > > > my
> > > > > > > > > >> > understanding StreamStatus is something transient,
> e.g.
> > > part
> > > > > of
> > > > > > > our
> > > > > > > > > >> > external system went offline. I think those kind of
> > events
> > > > > > should
> > > > > > > > not
> > > > > > > > > be
> > > > > > > > > >> > persisted.
> > > > > > > > > >> >
> > > > > > > > > >> > Both watermarks and idleness status can be some
> > > > > > > > > >> > inherent property of the underlying data stream. if an
> > > > > > > > > >> upstream/ingesting
> > > > > > > > > >> > system knows that this particular stream/partition of
> a
> > > > stream
> > > > > > is
> > > > > > > > > going
> > > > > > > > > >> > idle (for example for a couple of hours), why does
> this
> > > > > > > information
> > > > > > > > > >> have to
> > > > > > > > > >> > be re-created in the downstream system using some
> > > heuristic?
> > > > > It
> > > > > > > > could
> > > > > > > > > be
> > > > > > > > > >> > explicitly encoded.
> > > > > > > > > >> >
> > > > > > > > > >> > Because it's most certainly not true in the
> downstream.
> > > The
> > > > > > > idleness
> > > > > > > > > >> works
> > > > > > > > > >> > usually according to a heuristic: "We have not seen
> > > records
> > > > > for
> > > > > > 5
> > > > > > > > > >> minutes,
> > > > > > > > > >> > so there is a fair chance we won't see records for the
> > > next
> > > > 5
> > > > > > > > minutes,
> > > > > > > > > >> so
> > > > > > > > > >> > let's not wait for watermarks for now." That heuristic
> > > most
> > > > > > > > certainly
> > > > > > > > > >> won't
> > > > > > > > > >> > hold for a downstream persistent storage.
> > > > > > > > > >> >
> > > > > > > > > >> > Imagine you're starting consuming from the result
> > channel
> > > > in a
> > > > > > > > > situation
> > > > > > > > > >> > were you have:
> > > > > > > > > >> >
> > > > > > > > > >> > record4, record3, StreamStatus.ACTIVE,
> StreamStatus.IDLE
> > > > > > record2,
> > > > > > > > > >> record1,
> > > > > > > > > >> > record0
> > > > > > > > > >> >
> > > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is
> > unnecessary,
> > > > and
> > > > > > > might
> > > > > > > > > >> cause
> > > > > > > > > >> > the record3 and record4 to be late depending on how
> the
> > > > > > watermark
> > > > > > > > > >> > progressed in other partitions.
> > > > > > > > > >> >
> > > > > > > > > >> > I understand Eron's use case, which is not about
> storing
> > > the
> > > > > > > > > >> StreamStatus,
> > > > > > > > > >> > but performing an immediate aggregation or said
> > > differently
> > > > > > > changing
> > > > > > > > > the
> > > > > > > > > >> > partitioning/granularity of records and watermarks
> > > > externally
> > > > > to
> > > > > > > > > Flink.
> > > > > > > > > >> The
> > > > > > > > > >> > produced by Flink partitioning is actually never
> > persisted
> > > > in
> > > > > > that
> > > > > > > > > >> case. In
> > > > > > > > > >> > this case I agree exposing the StreamStatus makes
> > sense. I
> > > > am
> > > > > > > still
> > > > > > > > > >> > concerned it will lead to storing the StreamStatus
> which
> > > can
> > > > > > lead
> > > > > > > to
> > > > > > > > > >> many
> > > > > > > > > >> > subtle problems.
> > > > > > > > > >> > On 04/06/2021 11:53, Piotr Nowojski wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > Hi,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for picking up this discussion. For the
> record, I
> > > > also
> > > > > > > think
> > > > > > > > we
> > > > > > > > > >> > shouldn't expose latency markers.
> > > > > > > > > >> >
> > > > > > > > > >> > About the stream status
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >  Persisting the StreamStatus
> > > > > > > > > >> >
> > > > > > > > > >> > I don't agree with the view that sinks are "storing"
> the
> > > > > > > > data/idleness
> > > > > > > > > >> > status. This nomenclature makes only sense if we are
> > > talking
> > > > > > about
> > > > > > > > > >> > streaming jobs producing batch data.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > In my understanding a StreamStatus makes sense only
> when
> > > > > talking
> > > > > > > > about
> > > > > > > > > >> > immediately consumed transient channels such as
> between
> > > > > > operators
> > > > > > > > > within
> > > > > > > > > >> > a single job.
> > > > > > > > > >> >
> > > > > > > > > >> > Same as Eron I don't follow this point. Any streaming
> > sink
> > > > can
> > > > > > be
> > > > > > > > used
> > > > > > > > > >> as
> > > > > > > > > >> > this kind of transient channel. Streaming sinks, like
> > > Kafka,
> > > > > are
> > > > > > > > also
> > > > > > > > > >> used
> > > > > > > > > >> > to connect one streaming system with another one, also
> > for
> > > > an
> > > > > > > > > immediate
> > > > > > > > > >> > consumption.
> > > > > > > > > >> >
> > > > > > > > > >> > You could say the same thing about watermarks (note
> they
> > > are
> > > > > > > usually
> > > > > > > > > >> > generated in Flink based on the incoming events) and I
> > > would
> > > > > not
> > > > > > > > agree
> > > > > > > > > >> with
> > > > > > > > > >> > it in the same way. Both watermarks and idleness
> status
> > > can
> > > > be
> > > > > > > some
> > > > > > > > > >> > inherent property of the underlying data stream. if an
> > > > > > > > > >> upstream/ingesting
> > > > > > > > > >> > system knows that this particular stream/partition of
> a
> > > > stream
> > > > > > is
> > > > > > > > > going
> > > > > > > > > >> > idle (for example for a couple of hours), why does
> this
> > > > > > > information
> > > > > > > > > >> have to
> > > > > > > > > >> > be re-created in the downstream system using some
> > > heuristic?
> > > > > It
> > > > > > > > could
> > > > > > > > > be
> > > > > > > > > >> > explicitly encoded.  If you want to pass watermarks
> > > > explicitly
> > > > > > to
> > > > > > > a
> > > > > > > > > next
> > > > > > > > > >> > downstream streaming system, because you do not want
> to
> > > > > recreate
> > > > > > > > them
> > > > > > > > > >> from
> > > > > > > > > >> > the events using a duplicated logic, why wouldn't you
> > like
> > > > to
> > > > > do
> > > > > > > the
> > > > > > > > > >> same
> > > > > > > > > >> > thing with the idleness?
> > > > > > > > > >> >
> > > > > > > > > >> > Also keep in mind that I would expect that a user can
> > > decide
> > > > > > > whether
> > > > > > > > > he
> > > > > > > > > >> > wants to persist the watermarks/stream status on his
> > own.
> > > > This
> > > > > > > > > >> shouldn't be
> > > > > > > > > >> > obligatory.
> > > > > > > > > >> >
> > > > > > > > > >> > For me there is one good reason to not expose stream
> > > status
> > > > > YET.
> > > > > > > > That
> > > > > > > > > >> is,
> > > > > > > > > >> > if we are sure that we do not need this just yet,
> while
> > at
> > > > the
> > > > > > > same
> > > > > > > > > >> time we
> > > > > > > > > >> > don't want to expand the Public/PublicEvolving API, as
> > > this
> > > > > > always
> > > > > > > > > >> > increases the maintenance cost.
> > > > > > > > > >> >
> > > > > > > > > >> > Best,
> > > > > > > > > >> > Piotrek
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > pt., 4 cze 2021 o 10:57 Eron Wright <
> > > > ewri...@streamnative.io
> > > > > > > > .invalid>
> > > > > > > > > <
> > > > > > > > > >> ewri...@streamnative.io.invalid>
> > > > > > > > > >> > napisał(a):
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > I believe that the correctness of watermarks and
> stream
> > > > status
> > > > > > > > markers
> > > > > > > > > >> is
> > > > > > > > > >> > determined entirely by the source (ignoring the
> generic
> > > > > > assigner).
> > > > > > > > > Such
> > > > > > > > > >> > stream elements are known not to overtake records, and
> > > > aren't
> > > > > > > > > transient
> > > > > > > > > >> > from a pipeline perspective.  I do agree that
> recoveries
> > > may
> > > > > be
> > > > > > > > lossy
> > > > > > > > > if
> > > > > > > > > >> > some operator state is transient (e.g. valve state).
> > > > > > > > > >> >
> > > > > > > > > >> > Consider that status markers already affect the flow
> of
> > > > > > watermarks
> > > > > > > > > (e.g.
> > > > > > > > > >> > suppression), and thus affect operator behavior.
> Seems
> > to
> > > > me
> > > > > > that
> > > > > > > > > >> exposing
> > > > > > > > > >> > the idleness state is no different than exposing a
> > > > watermark.
> > > > > > > > > >> >
> > > > > > > > > >> > The high-level story is, there is a need for the Flink
> > job
> > > > to
> > > > > be
> > > > > > > > > >> > transparent or neutral with respect to the event time
> > > clock.
> > > > > I
> > > > > > > > > believe
> > > > > > > > > >> > this is possible if time flows with high fidelity from
> > > > source
> > > > > to
> > > > > > > > sink.
> > > > > > > > > >> Of
> > > > > > > > > >> > course, one always has the choice as to whether to use
> > > > > > > source-based
> > > > > > > > > >> > watermarks; as you mentioned, requirements vary.
> > > > > > > > > >> >
> > > > > > > > > >> > Regarding the Pulsar specifics, we're working on a
> > > community
> > > > > > > > proposal
> > > > > > > > > >> that
> > > > > > > > > >> > I'm anxious to share.  To answer your question, the
> > broker
> > > > > > > > aggregates
> > > > > > > > > >> > watermarks from multiple producers who are writing to
> a
> > > > single
> > > > > > > > topic.
> > > > > > > > > >> > Each sink
> > > > > > > > > >> > subtask is a producer.  The broker considers each
> > > producer's
> > > > > > > > > assertions
> > > > > > > > > >> > (watermarks, idleness) to be independent inputs, much
> > like
> > > > the
> > > > > > > case
> > > > > > > > > with
> > > > > > > > > >> > the watermark valve.
> > > > > > > > > >> >
> > > > > > > > > >> > On your concern about idleness causing false late
> > events,
> > > I
> > > > > > > > understand
> > > > > > > > > >> your
> > > > > > > > > >> > point but don't think it applies if the keyspace
> > > assignments
> > > > > are
> > > > > > > > > stable.
> > > > > > > > > >> >
> > > > > > > > > >> > I hope this explains to your satisfaction.
> > > > > > > > > >> >
> > > > > > > > > >> > - Eron
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz <
> > > > > > > > > dwysakow...@apache.org>
> > > > > > > > > >> <dwysakow...@apache.org>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Eron,
> > > > > > > > > >> >
> > > > > > > > > >> > I might be missing some background on Pulsar
> > partitioning
> > > > but
> > > > > > > > > something
> > > > > > > > > >> > seems off to me. What is the chunk/batch/partition
> that
> > > > Pulsar
> > > > > > > > brokers
> > > > > > > > > >> > will additionally combine watermarks for? Isn't it the
> > > case
> > > > > that
> > > > > > > > only
> > > > > > > > > a
> > > > > > > > > >> > single Flink sub-task would write to such a chunk and
> > thus
> > > > > will
> > > > > > > > > produce
> > > > > > > > > >> > an aggregated watermark already via the writeWatermark
> > > > method?
> > > > > > > > > >> >
> > > > > > > > > >> > Personally I am really skeptical about exposing the
> > > > > StreamStatus
> > > > > > > in
> > > > > > > > > any
> > > > > > > > > >> > Producer API. In my understanding the StreamStatus is
> a
> > > > > > transient
> > > > > > > > > >> > setting of a consumer of data. StreamStatus is a
> > mechanism
> > > > for
> > > > > > > > making
> > > > > > > > > a
> > > > > > > > > >> > tradeoff between correctness (how many late elements
> > that
> > > > are
> > > > > > > behind
> > > > > > > > > >> > watermark we have) vs making progress. IMO one has to
> be
> > > > extra
> > > > > > > > > cautious
> > > > > > > > > >> > when it comes to persistent systems. Again I might be
> > > > missing
> > > > > > the
> > > > > > > > > exact
> > > > > > > > > >> > use case you are trying to solve here, but I can
> imagine
> > > > > > multiple
> > > > > > > > jobs
> > > > > > > > > >> > reading from such a stream which might have different
> > > > > > correctness
> > > > > > > > > >> > requirements. Just quickly throwing an idea out of my
> > head
> > > > you
> > > > > > > might
> > > > > > > > > >> > want to have an entirely correct results which can be
> > > > delayed
> > > > > > for
> > > > > > > > > >> > minutes, and a separate task that produces quick
> > insights
> > > > > within
> > > > > > > > > >> > seconds. Another thing to consider is that by the time
> > the
> > > > > > > > downstream
> > > > > > > > > >> > job starts consuming the upstream one might have
> > produced
> > > > > > records
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > >> > previously idle chunk. Persisting the StreamStatus in
> > > such a
> > > > > > > > scenario
> > > > > > > > > >> > would add unnecessary false late events.
> > > > > > > > > >> >
> > > > > > > > > >> > In my understanding a StreamStatus makes sense only
> when
> > > > > talking
> > > > > > > > about
> > > > > > > > > >> > immediately consumed transient channels such as
> between
> > > > > > operators
> > > > > > > > > within
> > > > > > > > > >> > a single job.
> > > > > > > > > >> >
> > > > > > > > > >> > Best,
> > > > > > > > > >> >
> > > > > > > > > >> > Dawid
> > > > > > > > > >> >
> > > > > > > > > >> > On 03/06/2021 23:31, Eron Wright wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > I think the rationale for end-to-end idleness (i.e.
> > > between
> > > > > > > > pipelines)
> > > > > > > > > >> >
> > > > > > > > > >> > is
> > > > > > > > > >> >
> > > > > > > > > >> > the same as the rationale for idleness between
> operators
> > > > > within
> > > > > > a
> > > > > > > > > >> > pipeline.   On the 'main issue' you mentioned, we
> > entrust
> > > > the
> > > > > > > source
> > > > > > > > > >> >
> > > > > > > > > >> > with
> > > > > > > > > >> >
> > > > > > > > > >> > adapting to Flink's notion of idleness (e.g. in Pulsar
> > > > source,
> > > > > > it
> > > > > > > > > means
> > > > > > > > > >> > that no topics/partitions are assigned to a given
> > > > sub-task); a
> > > > > > > > similar
> > > > > > > > > >> > adaption would occur in the sink.  In other words, I
> > think
> > > > it
> > > > > > > > > >> >
> > > > > > > > > >> > reasonable
> > > > > > > > > >> >
> > > > > > > > > >> > that a sink for a watermark-aware storage system has
> > need
> > > > for
> > > > > > the
> > > > > > > > > >> >
> > > > > > > > > >> > idleness
> > > > > > > > > >> >
> > > > > > > > > >> > signal.
> > > > > > > > > >> >
> > > > > > > > > >> > Let me explain how I would use it in Pulsar's sink.
> > Each
> > > > > > sub-task
> > > > > > > > is
> > > > > > > > > a
> > > > > > > > > >> > Pulsar producer, and is writing watermarks to a
> > configured
> > > > > topic
> > > > > > > via
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > Producer API.  The Pulsar broker aggregates the
> > watermarks
> > > > > that
> > > > > > > are
> > > > > > > > > >> >
> > > > > > > > > >> > written
> > > > > > > > > >> >
> > > > > > > > > >> > by each producer into a global minimum (similar to
> > > > > > > > > >> >
> > > > > > > > > >> > StatusWatermarkValve).
> > > > > > > > > >> >
> > > > > > > > > >> > The broker keeps track of which producers are actively
> > > > > producing
> > > > > > > > > >> > watermarks, and a producer may mark itself as idle to
> > tell
> > > > the
> > > > > > > > broker
> > > > > > > > > >> >
> > > > > > > > > >> > not
> > > > > > > > > >> >
> > > > > > > > > >> > to wait for watermarks from it, e.g. when a producer
> is
> > > > going
> > > > > > > > > >> >
> > > > > > > > > >> > offline.  I
> > > > > > > > > >> >
> > > > > > > > > >> > had intended to mark the producer as idle when the
> > > sub-task
> > > > is
> > > > > > > > > closing,
> > > > > > > > > >> >
> > > > > > > > > >> > but
> > > > > > > > > >> >
> > > > > > > > > >> > now I see that it would be insufficient; the producer
> > > should
> > > > > > also
> > > > > > > be
> > > > > > > > > >> >
> > > > > > > > > >> > idled
> > > > > > > > > >> >
> > > > > > > > > >> > if the sub-task is idled.  Otherwise, the broker would
> > > wait
> > > > > > > > > >> >
> > > > > > > > > >> > indefinitely
> > > > > > > > > >> >
> > > > > > > > > >> > for the idled sub-task to produce a watermark.
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid, I think your original instincts were correct
> > about
> > > > > > idleness
> > > > > > > > > >> > propagation, and I hope I've demonstrated a practical
> > use
> > > > > case.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise <
> > > > ar...@apache.org
> > > > > >
> > > > > > <
> > > > > > > > > >> ar...@apache.org> wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > When I was rethinking the idleness issue, I came to
> the
> > > > > > conclusion
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> > it
> > > > > > > > > >> >
> > > > > > > > > >> > should be inferred at the source of the respective
> > > > downstream
> > > > > > > > pipeline
> > > > > > > > > >> > again.
> > > > > > > > > >> >
> > > > > > > > > >> > The main issue on propagating idleness is that you
> would
> > > > force
> > > > > > the
> > > > > > > > > >> >
> > > > > > > > > >> > same
> > > > > > > > > >> >
> > > > > > > > > >> > definition across all downstream pipelines, which may
> > not
> > > be
> > > > > > what
> > > > > > > > the
> > > > > > > > > >> >
> > > > > > > > > >> > user
> > > > > > > > > >> >
> > > > > > > > > >> > intended.
> > > > > > > > > >> > On the other hand, I don't immediately see a technical
> > > > reason
> > > > > > why
> > > > > > > > the
> > > > > > > > > >> > downstream source wouldn't be able to infer that.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Thu, Jun 3, 2021 at 9:14 PM Eron Wright <
> > > > > > > ewri...@streamnative.io
> > > > > > > > > >> > .invalid> <ewri...@streamnative.io.invalid>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks Piotr for bringing this up.  I reflected on
> this
> > > and
> > > > I
> > > > > > > agree
> > > > > > > > > >> >
> > > > > > > > > >> > we
> > > > > > > > > >> >
> > > > > > > > > >> > should expose idleness, otherwise a multi-stage flow
> > could
> > > > > > stall.
> > > > > > > > > >> >
> > > > > > > > > >> > Regarding the latency markers, I don't see an
> immediate
> > > need
> > > > > for
> > > > > > > > > >> > propagating them, because they serve to estimate
> latency
> > > > > within
> > > > > > a
> > > > > > > > > >> >
> > > > > > > > > >> > pipeline,
> > > > > > > > > >> >
> > > > > > > > > >> > not across pipelines.  One would probably need to
> > enhance
> > > > the
> > > > > > > source
> > > > > > > > > >> > interface also to do e2e latency.  Seems we agree this
> > > > aspect
> > > > > is
> > > > > > > out
> > > > > > > > > >> >
> > > > > > > > > >> > of
> > > > > > > > > >> >
> > > > > > > > > >> > scope.
> > > > > > > > > >> >
> > > > > > > > > >> > I took a look at the code to get a sense of how to
> > > > accomplish
> > > > > > > this.
> > > > > > > > > >> >
> > > > > > > > > >> > The
> > > > > > > > > >> >
> > > > > > > > > >> > gist is a new `markIdle` method on the
> `StreamOperator`
> > > > > > interface,
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> > is
> > > > > > > > > >> >
> > > > > > > > > >> > called when the stream status maintainer (the
> > > > `OperatorChain`)
> > > > > > > > > >> >
> > > > > > > > > >> > transitions
> > > > > > > > > >> >
> > > > > > > > > >> > to idle state.  Then, a new `markIdle` method on the
> > > > > > > `SinkFunction`
> > > > > > > > > >> >
> > > > > > > > > >> > and
> > > > > > > > > >> >
> > > > > > > > > >> > `SinkWriter` that is called by the respective
> operators.
> > > > >  Note
> > > > > > > that
> > > > > > > > > >> > StreamStatus is an internal class.
> > > > > > > > > >> >
> > > > > > > > > >> > Here's a draft PR (based on the existing PR of
> > > FLINK-22700)
> > > > to
> > > > > > > > > >> >
> > > > > > > > > >> > highlight
> > > > > > > > > >> >
> > > > > > > > > >> > this new aspect:
> > > > > > > https://github.com/streamnative/flink/pull/2/files
> > > > > > > > > >> >
> > > > > > > > > >> > Please let me know if you'd like me to proceed to
> update
> > > the
> > > > > > FLIP
> > > > > > > > > >> >
> > > > > > > > > >> > with
> > > > > > > > > >> >
> > > > > > > > > >> > these details.
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks again,
> > > > > > > > > >> > Eron
> > > > > > > > > >> >
> > > > > > > > > >> > On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski <
> > > > > > > pnowoj...@apache.org
> > > > > > > > >
> > > > > > > > > <
> > > > > > > > > >> pnowoj...@apache.org>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Hi,
> > > > > > > > > >> >
> > > > > > > > > >> > Sorry for chipping in late in the discussion, but I
> > would
> > > > > second
> > > > > > > > > >> >
> > > > > > > > > >> > this
> > > > > > > > > >> >
> > > > > > > > > >> > point
> > > > > > > > > >> >
> > > > > > > > > >> > from Arvid:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker would
> > also
> > > > need
> > > > > > to
> > > > > > > > > >> >
> > > > > > > > > >> > be
> > > > > > > > > >> >
> > > > > > > > > >> > encoded.
> > > > > > > > > >> >
> > > > > > > > > >> > It seems like this point was asked, but not followed?
> Or
> > > > did I
> > > > > > > miss
> > > > > > > > > >> >
> > > > > > > > > >> > it?
> > > > > > > > > >> >
> > > > > > > > > >> > Especially the StreamStatus part. For me it sounds
> like
> > > > > exposing
> > > > > > > > > >> >
> > > > > > > > > >> > watermarks
> > > > > > > > > >> >
> > > > > > > > > >> > without letting the sink know that the stream can be
> > idle
> > > is
> > > > > an
> > > > > > > > > >> >
> > > > > > > > > >> > incomplete
> > > > > > > > > >> >
> > > > > > > > > >> > feature and can be very problematic/confusing for
> > > potential
> > > > > > users.
> > > > > > > > > >> >
> > > > > > > > > >> > Best,
> > > > > > > > > >> > Piotrek
> > > > > > > > > >> >
> > > > > > > > > >> > pon., 31 maj 2021 o 08:34 Arvid Heise <
> ar...@apache.org
> > >
> > > <
> > > > > > > > > >> ar...@apache.org>
> > > > > > > > > >> >
> > > > > > > > > >> > napisał(a):
> > > > > > > > > >> >
> > > > > > > > > >> > Afaik everyone can start a [VOTE] thread [1]. For
> > example,
> > > > > here
> > > > > > a
> > > > > > > > > >> > non-committer started a successful thread [2].
> > > > > > > > > >> > If you start it, I can already cast a binding vote and
> > we
> > > > just
> > > > > > > > > >> >
> > > > > > > > > >> > need 2
> > > > > > > > > >> >
> > > > > > > > > >> > more
> > > > > > > > > >> >
> > > > > > > > > >> > for the FLIP to be accepted.
> > > > > > > > > >> >
> > > > > > > > > >> > [1]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting
> > > > > > > > > >> >
> > > > > > > > > >> > [2]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html
> > > > > > > > > >> >
> > > > > > > > > >> > On Fri, May 28, 2021 at 8:17 PM Eron Wright <
> > > > > > > > > >> >
> > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > >> >
> > > > > > > > > >> > .invalid>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid,
> > > > > > > > > >> > Thanks for the feedback.  I investigated the japicmp
> > > > > > > > > >> >
> > > > > > > > > >> > configuration,
> > > > > > > > > >> >
> > > > > > > > > >> > and I
> > > > > > > > > >> >
> > > > > > > > > >> > see that SinkWriter is marked Experimental (not Public
> > or
> > > > > > > > > >> >
> > > > > > > > > >> > PublicEvolving).
> > > > > > > > > >> >
> > > > > > > > > >> > I think this means that SinkWriter need not be
> excluded.
> > > As
> > > > > you
> > > > > > > > > >> >
> > > > > > > > > >> > mentioned,
> > > > > > > > > >> >
> > > > > > > > > >> > SinkFunction is already excluded.  I've updated the
> FLIP
> > > > with
> > > > > an
> > > > > > > > > >> > explanation.
> > > > > > > > > >> >
> > > > > > > > > >> > I believe all issues are resolved.  May we proceed to
> a
> > > vote
> > > > > > now?
> > > > > > > > > >> >
> > > > > > > > > >> > And
> > > > > > > > > >> >
> > > > > > > > > >> > are
> > > > > > > > > >> >
> > > > > > > > > >> > you able to drive the vote process?
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks,
> > > > > > > > > >> > Eron
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Fri, May 28, 2021 at 4:40 AM Arvid Heise <
> > > > ar...@apache.org
> > > > > >
> > > > > > <
> > > > > > > > > >> ar...@apache.org>
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Eron,
> > > > > > > > > >> >
> > > > > > > > > >> > 1. fair point. It still feels odd to have
> writeWatermark
> > > in
> > > > > the
> > > > > > > > > >> > SinkFunction (it's supposed to be functional as you
> > > > > mentioned),
> > > > > > > > > >> >
> > > > > > > > > >> > but I
> > > > > > > > > >> >
> > > > > > > > > >> > agree
> > > > > > > > > >> >
> > > > > > > > > >> > that invokeWatermark is not better. So unless someone
> > has
> > > a
> > > > > > > > > >> >
> > > > > > > > > >> > better
> > > > > > > > > >> >
> > > > > > > > > >> > idea,
> > > > > > > > > >> >
> > > > > > > > > >> > I'm fine with it.
> > > > > > > > > >> > 2.+3. I tried to come up with scenarios for a longer
> > time.
> > > > In
> > > > > > > > > >> >
> > > > > > > > > >> > general,
> > > > > > > > > >> >
> > > > > > > > > >> > it
> > > > > > > > > >> >
> > > > > > > > > >> > seems as if the new SinkWriter interface encourages
> more
> > > > > > > > > >> >
> > > > > > > > > >> > injection
> > > > > > > > > >> >
> > > > > > > > > >> > (see
> > > > > > > > > >> >
> > > > > > > > > >> > processing time service in InitContext), such that the
> > > need
> > > > > for
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > context
> > > > > > > > > >> >
> > > > > > > > > >> > is really just context information of that particular
> > > record
> > > > > and
> > > > > > > > > >> >
> > > > > > > > > >> > I
> > > > > > > > > >> >
> > > > > > > > > >> > don't
> > > > > > > > > >> >
> > > > > > > > > >> > see any use beyond timestamp and watermark. For
> > > > SinkFunction,
> > > > > > I'd
> > > > > > > > > >> >
> > > > > > > > > >> > not
> > > > > > > > > >> >
> > > > > > > > > >> > over-engineer as it's going to be deprecated soonish.
> So
> > > +1
> > > > to
> > > > > > > > > >> >
> > > > > > > > > >> > leave
> > > > > > > > > >> >
> > > > > > > > > >> > it
> > > > > > > > > >> >
> > > > > > > > > >> > out.
> > > > > > > > > >> > 4. Okay so I double-checked: from an execution
> > > perspective,
> > > > it
> > > > > > > > > >> >
> > > > > > > > > >> > works.
> > > > > > > > > >> >
> > > > > > > > > >> > However, japicmp would definitely complain. I propose
> to
> > > add
> > > > > it
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > compatibility section like this. We need to add an
> > > exception
> > > > > to
> > > > > > > > > >> >
> > > > > > > > > >> > SinkWriter
> > > > > > > > > >> >
> > > > > > > > > >> > then. (SinkFunction is already on the exception list)
> > > > > > > > > >> > 5.+6. Awesome, I was also sure but wanted to double
> > check.
> > > > > > > > > >> >
> > > > > > > > > >> > Best,
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Wed, May 26, 2021 at 7:29 PM Eron Wright <
> > > > > > > > > >> >
> > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > >> >
> > > > > > > > > >> > .invalid>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid,
> > > > > > > > > >> >
> > > > > > > > > >> > 1. I assume that the method name `invoke` stems from
> > > > > > > > > >> >
> > > > > > > > > >> > considering
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > SinkFunction to be a functional interface, but is
> > > otherwise
> > > > > > > > > >> >
> > > > > > > > > >> > meaningless.
> > > > > > > > > >> >
> > > > > > > > > >> > Keeping it as `writeWatermark` does keep it symmetric
> > with
> > > > > > > > > >> >
> > > > > > > > > >> > SinkWriter.
> > > > > > > > > >> >
> > > > > > > > > >> > My
> > > > > > > > > >> >
> > > > > > > > > >> > vote is to leave it.  You decide.
> > > > > > > > > >> >
> > > > > > > > > >> > 2+3. I too considered adding a `WatermarkContext`, but
> > it
> > > > > would
> > > > > > > > > >> >
> > > > > > > > > >> > merely
> > > > > > > > > >> >
> > > > > > > > > >> > be a
> > > > > > > > > >> >
> > > > > > > > > >> > placeholder.  I don't anticipate any context info in
> > > future.
> > > > > > > > > >> >
> > > > > > > > > >> > As
> > > > > > > > > >> >
> > > > > > > > > >> > we
> > > > > > > > > >> >
> > > > > > > > > >> > see
> > > > > > > > > >> >
> > > > > > > > > >> > with invoke, it is possible to add a context later in
> a
> > > > > > > > > >> > backwards-compatible way.  My vote is to not
> introduce a
> > > > > > > > > >> >
> > > > > > > > > >> > context.
> > > > > > > > > >> >
> > > > > > > > > >> > You
> > > > > > > > > >> >
> > > > > > > > > >> > decide.
> > > > > > > > > >> >
> > > > > > > > > >> > 4. No anticipated compatibility issues.
> > > > > > > > > >> >
> > > > > > > > > >> > 5. Short answer, it works as expected.  The new
> methods
> > > are
> > > > > > > > > >> >
> > > > > > > > > >> > invoked
> > > > > > > > > >> >
> > > > > > > > > >> > whenever the underlying operator receives a watermark.
> > I
> > > do
> > > > > > > > > >> >
> > > > > > > > > >> > believe
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> > batch and ingestion time applications receive
> > watermarks.
> > > > > Seems
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > programming model is more unified in that respect
> since
> > > 1.12
> > > > > > > > > >> >
> > > > > > > > > >> > (FLIP-134).
> > > > > > > > > >> >
> > > > > > > > > >> > 6. The failure behavior is the same as for elements.
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks,
> > > > > > > > > >> > Eron
> > > > > > > > > >> >
> > > > > > > > > >> > On Tue, May 25, 2021 at 12:42 PM Arvid Heise <
> > > > > ar...@apache.org
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Eron,
> > > > > > > > > >> >
> > > > > > > > > >> > I think the FLIP is crisp and mostly good to go. Some
> > > > smaller
> > > > > > > > > >> > things/questions:
> > > > > > > > > >> >
> > > > > > > > > >> >    1. SinkFunction#writeWatermark could be named
> > > > > > > > > >> >    SinkFunction#invokeWatermark or invokeOnWatermark
> to
> > > keep
> > > > > > > > > >> >
> > > > > > > > > >> > it
> > > > > > > > > >> >
> > > > > > > > > >> > symmetric.
> > > > > > > > > >> >
> > > > > > > > > >> >    2. We could add the context parameter to both. For
> > > > > > > > > >> >
> > > > > > > > > >> > SinkWriter#Context,
> > > > > > > > > >> >
> > > > > > > > > >> >    we currently do not gain much. SinkFunction#Context
> > > also
> > > > > > > > > >> >
> > > > > > > > > >> > exposes
> > > > > > > > > >> >
> > > > > > > > > >> > processing
> > > > > > > > > >> >    time, which may or may not be handy and is
> currently
> > > > > > > > > >> >
> > > > > > > > > >> > mostly
> > > > > > > > > >> >
> > > > > > > > > >> > used
> > > > > > > > > >> >
> > > > > > > > > >> > for
> > > > > > > > > >> >
> > > > > > > > > >> >    StreamingFileSink bucket policies. We may add that
> > > > > > > > > >> >
> > > > > > > > > >> > processing
> > > > > > > > > >> >
> > > > > > > > > >> > time
> > > > > > > > > >> >
> > > > > > > > > >> > flag
> > > > > > > > > >> >
> > > > > > > > > >> >    also to SinkWriter#Context in the future.
> > > > > > > > > >> >    3. Alternatively, we could also add a different
> > context
> > > > > > > > > >> >
> > > > > > > > > >> > parameter
> > > > > > > > > >> >
> > > > > > > > > >> > just
> > > > > > > > > >> >
> > > > > > > > > >> >    to keep the API stable while allowing additional
> > > > > > > > > >> >
> > > > > > > > > >> > information
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > be
> > > > > > > > > >> >
> > > > > > > > > >> > passed
> > > > > > > > > >> >    in the future.
> > > > > > > > > >> >    4. Would we run into any compatibility issue if we
> > use
> > > > > > > > > >> >
> > > > > > > > > >> > Flink
> > > > > > > > > >> >
> > > > > > > > > >> > 1.13
> > > > > > > > > >> >
> > > > > > > > > >> > source
> > > > > > > > > >> >
> > > > > > > > > >> >    in Flink 1.14 (with this FLIP) or vice versa?
> > > > > > > > > >> >    5. What happens with sinks that use the new methods
> > in
> > > > > > > > > >> >
> > > > > > > > > >> > applications
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> >    do not have watermarks (batch mode, processing
> time)?
> > > > Does
> > > > > > > > > >> >
> > > > > > > > > >> > this
> > > > > > > > > >> >
> > > > > > > > > >> > also
> > > > > > > > > >> >
> > > > > > > > > >> > work
> > > > > > > > > >> >    with ingestion time sufficiently?
> > > > > > > > > >> >    6. How do exactly once sinks deal with written
> > > watermarks
> > > > > > > > > >> >
> > > > > > > > > >> > in
> > > > > > > > > >> >
> > > > > > > > > >> > case
> > > > > > > > > >> >
> > > > > > > > > >> > of
> > > > > > > > > >> >
> > > > > > > > > >> >    failure? I guess it's the same as normal records.
> > > (Either
> > > > > > > > > >> >
> > > > > > > > > >> > rollback
> > > > > > > > > >> >
> > > > > > > > > >> > of
> > > > > > > > > >> >
> > > > > > > > > >> >    transaction or deduplication on resumption)
> > > > > > > > > >> >
> > > > > > > > > >> > Best,
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid
> > > > > > > > > >> >
> > > > > > > > > >> > On Tue, May 25, 2021 at 6:44 PM Eron Wright <
> > > > > > > > > >> >
> > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > >> >
> > > > > > > > > >> > .invalid>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Does anyone have further comment on FLIP-167?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks,
> > > > > > > > > >> > Eron
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Thu, May 20, 2021 at 5:02 PM Eron Wright <
> > > > > > > > > >> >
> > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Filed FLIP-167: Watermarks for Sink API:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > > >> >
> > > > > > > > > >> > I'd like to call a vote next week, is that reasonable?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <
> > > > > > > > > >> >
> > > > > > > > > >> > b.z...@dell.com
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Arvid and Eron,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for the discussion and I read through Eron's
> pull
> > > > > > > > > >> >
> > > > > > > > > >> > request
> > > > > > > > > >> >
> > > > > > > > > >> > and I
> > > > > > > > > >> >
> > > > > > > > > >> > think this can benefit Pravega Flink connector as
> well.
> > > > > > > > > >> >
> > > > > > > > > >> > Here is some background. Pravega had the watermark
> > > > > > > > > >> >
> > > > > > > > > >> > concept
> > > > > > > > > >> >
> > > > > > > > > >> > through
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > event stream since two years ago, and here is a blog
> > > > > > > > > >> >
> > > > > > > > > >> > introduction[1]
> > > > > > > > > >> >
> > > > > > > > > >> > for
> > > > > > > > > >> >
> > > > > > > > > >> > Pravega watermark.
> > > > > > > > > >> > Pravega Flink connector also had this watermark
> > > > > > > > > >> >
> > > > > > > > > >> > integration
> > > > > > > > > >> >
> > > > > > > > > >> > last
> > > > > > > > > >> >
> > > > > > > > > >> > year
> > > > > > > > > >> >
> > > > > > > > > >> > that we wanted to propagate the Flink watermark to
> > > > > > > > > >> >
> > > > > > > > > >> > Pravega
> > > > > > > > > >> >
> > > > > > > > > >> > in
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > SinkFunction, and at that time we just used the
> existing
> > > > > > > > > >> >
> > > > > > > > > >> > Flink
> > > > > > > > > >> >
> > > > > > > > > >> > API
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> > we
> > > > > > > > > >> >
> > > > > > > > > >> > keep the last watermark in memory and check if
> watermark
> > > > > > > > > >> >
> > > > > > > > > >> > changes
> > > > > > > > > >> >
> > > > > > > > > >> > for
> > > > > > > > > >> >
> > > > > > > > > >> > each
> > > > > > > > > >> >
> > > > > > > > > >> > event[2] which is not efficient. With such new
> > > > > > > > > >> >
> > > > > > > > > >> > interface,
> > > > > > > > > >> >
> > > > > > > > > >> > we
> > > > > > > > > >> >
> > > > > > > > > >> > can
> > > > > > > > > >> >
> > > > > > > > > >> > also
> > > > > > > > > >> >
> > > > > > > > > >> > manage the watermark propagation much more easily.
> > > > > > > > > >> >
> > > > > > > > > >> > [1]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> > > > > > > > > >> >
> > > > > > > > > >> > [2]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> > > > > > > > > >> >
> > > > > > > > > >> > -----Original Message-----
> > > > > > > > > >> > From: Arvid Heise <ar...@apache.org> <
> ar...@apache.org>
> > > > > > > > > >> > Sent: Wednesday, May 19, 2021 16:06
> > > > > > > > > >> > To: dev
> > > > > > > > > >> > Subject: Re: [DISCUSS] Watermark propagation with Sink
> > > > > > > > > >> >
> > > > > > > > > >> > API
> > > > > > > > > >> >
> > > > > > > > > >> > [EXTERNAL EMAIL]
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Eron,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for pushing that topic. I can now see that the
> > > > > > > > > >> >
> > > > > > > > > >> > benefit
> > > > > > > > > >> >
> > > > > > > > > >> > is
> > > > > > > > > >> >
> > > > > > > > > >> > even
> > > > > > > > > >> >
> > > > > > > > > >> > bigger than I initially thought. So it's worthwhile
> > > > > > > > > >> >
> > > > > > > > > >> > anyways
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > include
> > > > > > > > > >> >
> > > > > > > > > >> > that.
> > > > > > > > > >> >
> > > > > > > > > >> > I also briefly thought about exposing watermarks to
> all
> > > > > > > > > >> >
> > > > > > > > > >> > UDFs,
> > > > > > > > > >> >
> > > > > > > > > >> > but
> > > > > > > > > >> >
> > > > > > > > > >> > here I
> > > > > > > > > >> >
> > > > > > > > > >> > really have an issue to see specific use cases. Could
> > > > > > > > > >> >
> > > > > > > > > >> > you
> > > > > > > > > >> >
> > > > > > > > > >> > maybe
> > > > > > > > > >> >
> > > > > > > > > >> > take a
> > > > > > > > > >> >
> > > > > > > > > >> > few
> > > > > > > > > >> >
> > > > > > > > > >> > minutes to think about it as well? I could only see
> > > > > > > > > >> >
> > > > > > > > > >> > someone
> > > > > > > > > >> >
> > > > > > > > > >> > misusing
> > > > > > > > > >> >
> > > > > > > > > >> > Async
> > > > > > > > > >> >
> > > > > > > > > >> > IO as a sink where a real sink would be more
> > > > > > > > > >> >
> > > > > > > > > >> > appropriate.
> > > > > > > > > >> >
> > > > > > > > > >> > In
> > > > > > > > > >> >
> > > > > > > > > >> > general,
> > > > > > > > > >> >
> > > > > > > > > >> > if
> > > > > > > > > >> >
> > > > > > > > > >> > there is not a clear use case, we shouldn't add the
> > > > > > > > > >> >
> > > > > > > > > >> > functionality
> > > > > > > > > >> >
> > > > > > > > > >> > as
> > > > > > > > > >> >
> > > > > > > > > >> > it's
> > > > > > > > > >> >
> > > > > > > > > >> > just increased maintenance for no value.
> > > > > > > > > >> >
> > > > > > > > > >> > If we stick to the plan, I think your PR is already
> in a
> > > > > > > > > >> >
> > > > > > > > > >> > good
> > > > > > > > > >> >
> > > > > > > > > >> > shape.
> > > > > > > > > >> >
> > > > > > > > > >> > We
> > > > > > > > > >> >
> > > > > > > > > >> > need to create a FLIP for it though, since it changes
> > > > > > > > > >> >
> > > > > > > > > >> > Public
> > > > > > > > > >> >
> > > > > > > > > >> > interfaces
> > > > > > > > > >> >
> > > > > > > > > >> > [1]. I was initially not convinced that we should also
> > > > > > > > > >> >
> > > > > > > > > >> > change
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > old
> > > > > > > > > >> >
> > > > > > > > > >> > SinkFunction interface, but seeing how little the
> change
> > > > > > > > > >> >
> > > > > > > > > >> > is, I
> > > > > > > > > >> >
> > > > > > > > > >> > wouldn't
> > > > > > > > > >> >
> > > > > > > > > >> > mind at all to increase consistency. Only when we
> wrote
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > FLIP
> > > > > > > > > >> >
> > > > > > > > > >> > and
> > > > > > > > > >> >
> > > > > > > > > >> > approved it (which should be minimal and fast), we
> > > > > > > > > >> >
> > > > > > > > > >> > should
> > > > > > > > > >> >
> > > > > > > > > >> > actually
> > > > > > > > > >> >
> > > > > > > > > >> > look
> > > > > > > > > >> >
> > > > > > > > > >> > at
> > > > > > > > > >> >
> > > > > > > > > >> > the PR ;).
> > > > > > > > > >> >
> > > > > > > > > >> > The only thing which I would improve is the name of
> the
> > > > > > > > > >> >
> > > > > > > > > >> > function.
> > > > > > > > > >> >
> > > > > > > > > >> > processWatermark sounds as if the sink implementer
> > > > > > > > > >> >
> > > > > > > > > >> > really
> > > > > > > > > >> >
> > > > > > > > > >> > needs
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > implement it (as you would need to do it on a custom
> > > > > > > > > >> >
> > > > > > > > > >> > operator).
> > > > > > > > > >> >
> > > > > > > > > >> > I
> > > > > > > > > >> >
> > > > > > > > > >> > would
> > > > > > > > > >> >
> > > > > > > > > >> > make them symmetric to the record writing/invoking
> > > > > > > > > >> >
> > > > > > > > > >> > method
> > > > > > > > > >> >
> > > > > > > > > >> > (e.g.
> > > > > > > > > >> >
> > > > > > > > > >> > writeWatermark and invokeWatermark).
> > > > > > > > > >> >
> > > > > > > > > >> > As a follow-up PR, we should then migrate KafkaShuffle
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > new
> > > > > > > > > >> >
> > > > > > > > > >> > API.
> > > > > > > > > >> >
> > > > > > > > > >> > But that's something I can do.
> > > > > > > > > >> >
> > > > > > > > > >> > [1]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> > > > > > > > > >> >
> > > > > > > > > >> > [cwiki[.]apache[.]org]
> > > > > > > > > >> >
> > > > > > > > > >> > On Wed, May 19, 2021 at 3:34 AM Eron Wright <
> > > > > > > > > >> >
> > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > >> >
> > > > > > > > > >> > .invalid>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Update: opened an issue and a PR.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> > > > > > > > > >> >
> > > > > > > > > >> > plbgRO4$ [issues[.]apache[.]org]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> > > > > > > > > >> >
> > > > > > > > > >> > $ [github[.]com]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <
> > > > > > > > > >> >
> > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks Arvid and David for sharing your ideas on
> > > > > > > > > >> >
> > > > > > > > > >> > this
> > > > > > > > > >> >
> > > > > > > > > >> > subject.
> > > > > > > > > >> >
> > > > > > > > > >> > I'm
> > > > > > > > > >> >
> > > > > > > > > >> > glad to hear that you're seeing use cases for
> > > > > > > > > >> >
> > > > > > > > > >> > watermark
> > > > > > > > > >> >
> > > > > > > > > >> > propagation
> > > > > > > > > >> >
> > > > > > > > > >> > via an enhanced sink interface.
> > > > > > > > > >> >
> > > > > > > > > >> > As you've guessed, my interest is in Pulsar and am
> > > > > > > > > >> >
> > > > > > > > > >> > exploring
> > > > > > > > > >> >
> > > > > > > > > >> > some
> > > > > > > > > >> >
> > > > > > > > > >> > options for brokering watermarks across stream
> > > > > > > > > >> >
> > > > > > > > > >> > processing
> > > > > > > > > >> >
> > > > > > > > > >> > pipelines.
> > > > > > > > > >> >
> > > > > > > > > >> > I think
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid
> > > > > > > > > >> >
> > > > > > > > > >> > is speaking to a high-fidelity solution where the
> > > > > > > > > >> >
> > > > > > > > > >> > difference
> > > > > > > > > >> >
> > > > > > > > > >> > between
> > > > > > > > > >> >
> > > > > > > > > >> > intra-
> > > > > > > > > >> >
> > > > > > > > > >> > and inter-pipeline flow is eliminated.  My goal is
> > > > > > > > > >> >
> > > > > > > > > >> > more
> > > > > > > > > >> >
> > > > > > > > > >> > limited; I
> > > > > > > > > >> >
> > > > > > > > > >> > want
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > write the watermark that arrives at the sink to
> > > > > > > > > >> >
> > > > > > > > > >> > Pulsar.
> > > > > > > > > >> >
> > > > > > > > > >> > Simply
> > > > > > > > > >> >
> > > > > > > > > >> > imagine that Pulsar has native support for
> > > > > > > > > >> >
> > > > > > > > > >> > watermarking
> > > > > > > > > >> >
> > > > > > > > > >> > in
> > > > > > > > > >> >
> > > > > > > > > >> > its
> > > > > > > > > >> >
> > > > > > > > > >> > producer/consumer API, and we'll leave the details
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > another
> > > > > > > > > >> >
> > > > > > > > > >> > forum.
> > > > > > > > > >> >
> > > > > > > > > >> > David, I like your invariant.  I see lateness as
> > > > > > > > > >> >
> > > > > > > > > >> > stemming
> > > > > > > > > >> >
> > > > > > > > > >> > from
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > problem
> > > > > > > > > >> >
> > > > > > > > > >> > domain and from system dynamics (e.g. scheduling,
> > > > > > > > > >> >
> > > > > > > > > >> > batching,
> > > > > > > > > >> >
> > > > > > > > > >> > lag).
> > > > > > > > > >> >
> > > > > > > > > >> > When
> > > > > > > > > >> >
> > > > > > > > > >> > one
> > > > > > > > > >> >
> > > > > > > > > >> > depends on order-of-observation to generate
> > > > > > > > > >> >
> > > > > > > > > >> > watermarks,
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > app
> > > > > > > > > >> >
> > > > > > > > > >> > may
> > > > > > > > > >> >
> > > > > > > > > >> > become
> > > > > > > > > >> >
> > > > > > > > > >> > unduly sensitive to dynamics which bear on
> > > > > > > > > >> >
> > > > > > > > > >> > order-of-observation.
> > > > > > > > > >> >
> > > > > > > > > >> > My
> > > > > > > > > >> >
> > > > > > > > > >> > goal is to factor out the system dynamics from
> > > > > > > > > >> >
> > > > > > > > > >> > lateness
> > > > > > > > > >> >
> > > > > > > > > >> > determination.
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid, to be most valuable (at least for my
> > > > > > > > > >> >
> > > > > > > > > >> > purposes)
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > enhancement is needed on SinkFunction.  This will
> > > > > > > > > >> >
> > > > > > > > > >> > allow
> > > > > > > > > >> >
> > > > > > > > > >> > us
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > easily
> > > > > > > > > >> >
> > > > > > > > > >> > evolve the existing Pulsar connector.
> > > > > > > > > >> >
> > > > > > > > > >> > Next step, I will open a PR to advance the
> > > > > > > > > >> >
> > > > > > > > > >> > conversation.
> > > > > > > > > >> >
> > > > > > > > > >> > Eron
> > > > > > > > > >> >
> > > > > > > > > >> > On Tue, May 18, 2021 at 5:06 AM David Morávek<
> > > > > > > > david.mora...@gmail.com
> > > > > > > > > >
> > > > > > > > > >> <david.mora...@gmail.com>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Eron,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for starting this discussion. I've been
> > > > > > > > > >> >
> > > > > > > > > >> > thinking
> > > > > > > > > >> >
> > > > > > > > > >> > about
> > > > > > > > > >> >
> > > > > > > > > >> > this
> > > > > > > > > >> >
> > > > > > > > > >> > recently as we've run into "watermark related"
> > > > > > > > > >> >
> > > > > > > > > >> > issues,
> > > > > > > > > >> >
> > > > > > > > > >> > when
> > > > > > > > > >> >
> > > > > > > > > >> > chaining multiple pipelines together. My to cents
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > discussion:
> > > > > > > > > >> >
> > > > > > > > > >> > How I like to think about the problem, is that
> > > > > > > > > >> >
> > > > > > > > > >> > there
> > > > > > > > > >> >
> > > > > > > > > >> > should
> > > > > > > > > >> >
> > > > > > > > > >> > an
> > > > > > > > > >> >
> > > > > > > > > >> > invariant that holds for any stream processing
> > > > > > > > > >> >
> > > > > > > > > >> > pipeline:
> > > > > > > > > >> >
> > > > > > > > > >> > "NON_LATE
> > > > > > > > > >> >
> > > > > > > > > >> > element
> > > > > > > > > >> >
> > > > > > > > > >> > entering
> > > > > > > > > >> >
> > > > > > > > > >> > the system, should never become LATE"
> > > > > > > > > >> >
> > > > > > > > > >> > Unfortunately this is exactly what happens in
> > > > > > > > > >> >
> > > > > > > > > >> > downstream
> > > > > > > > > >> >
> > > > > > > > > >> > pipelines,
> > > > > > > > > >> >
> > > > > > > > > >> > because the upstream one can:
> > > > > > > > > >> > - break ordering (especially with higher
> > > > > > > > > >> >
> > > > > > > > > >> > parallelism)
> > > > > > > > > >> >
> > > > > > > > > >> > - emit elements that are ahead of output watermark
> > > > > > > > > >> >
> > > > > > > > > >> > There is not enough information to re-construct
> > > > > > > > > >> >
> > > > > > > > > >> > upstream
> > > > > > > > > >> >
> > > > > > > > > >> > watermark
> > > > > > > > > >> >
> > > > > > > > > >> > in latter stages (it's always just an estimate
> > > > > > > > > >> >
> > > > > > > > > >> > based
> > > > > > > > > >> >
> > > > > > > > > >> > on
> > > > > > > > > >> >
> > > > > > > > > >> > previous
> > > > > > > > > >> >
> > > > > > > > > >> > pipeline's output).
> > > > > > > > > >> >
> > > > > > > > > >> > It would be great, if we could have a general
> > > > > > > > > >> >
> > > > > > > > > >> > abstraction,
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> > is
> > > > > > > > > >> >
> > > > > > > > > >> > reusable for various sources / sinks (not just
> > > > > > > > > >> >
> > > > > > > > > >> > Kafka
> > > > > > > > > >> >
> > > > > > > > > >> > /
> > > > > > > > > >> >
> > > > > > > > > >> > Pulsar,
> > > > > > > > > >> >
> > > > > > > > > >> > thought this would probably cover most of the
> > > > > > > > > >> >
> > > > > > > > > >> > use-cases)
> > > > > > > > > >> >
> > > > > > > > > >> > and
> > > > > > > > > >> >
> > > > > > > > > >> > systems.
> > > > > > > > > >> >
> > > > > > > > > >> > Is there any other use-case then sharing watermark
> > > > > > > > > >> >
> > > > > > > > > >> > between
> > > > > > > > > >> >
> > > > > > > > > >> > pipelines,
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> > you're trying to solve?
> > > > > > > > > >> >
> > > > > > > > > >> > Arvid:
> > > > > > > > > >> >
> > > > > > > > > >> > 1. Watermarks are closely coupled to the used
> > > > > > > > > >> >
> > > > > > > > > >> > system
> > > > > > > > > >> >
> > > > > > > > > >> > (=Flink).
> > > > > > > > > >> >
> > > > > > > > > >> > I
> > > > > > > > > >> >
> > > > > > > > > >> > have a
> > > > > > > > > >> >
> > > > > > > > > >> > hard time imagining that it's useful to use a
> > > > > > > > > >> >
> > > > > > > > > >> > different
> > > > > > > > > >> >
> > > > > > > > > >> > stream
> > > > > > > > > >> >
> > > > > > > > > >> > processor
> > > > > > > > > >> >
> > > > > > > > > >> > downstream. So for now, I'm assuming that both
> > > > > > > > > >> >
> > > > > > > > > >> > upstream
> > > > > > > > > >> >
> > > > > > > > > >> > and
> > > > > > > > > >> >
> > > > > > > > > >> > downstream
> > > > > > > > > >> >
> > > > > > > > > >> > are
> > > > > > > > > >> >
> > > > > > > > > >> > Flink applications. In that case, we probably
> > > > > > > > > >> >
> > > > > > > > > >> > define
> > > > > > > > > >> >
> > > > > > > > > >> > both
> > > > > > > > > >> >
> > > > > > > > > >> > parts
> > > > > > > > > >> >
> > > > > > > > > >> > of the pipeline in the same Flink job similar to
> > > > > > > > > >> >
> > > > > > > > > >> > KafkaStream's
> > > > > > > > > >> >
> > > > > > > > > >> > #through.
> > > > > > > > > >> >
> > > > > > > > > >> > I'd slightly disagree here. For example we're
> > > > > > > > > >> >
> > > > > > > > > >> > "materializing"
> > > > > > > > > >> >
> > > > > > > > > >> > change-logs
> > > > > > > > > >> >
> > > > > > > > > >> > produced by Flink pipeline into serving layer
> > > > > > > > > >> >
> > > > > > > > > >> > (random
> > > > > > > > > >> >
> > > > > > > > > >> > access
> > > > > > > > > >> >
> > > > > > > > > >> > db /
> > > > > > > > > >> >
> > > > > > > > > >> > in memory view / ..) and we need to know, whether
> > > > > > > > > >> >
> > > > > > > > > >> > responses
> > > > > > > > > >> >
> > > > > > > > > >> > we
> > > > > > > > > >> >
> > > > > > > > > >> > serve meet the "freshness" requirements (eg. you
> > > > > > > > > >> >
> > > > > > > > > >> > may
> > > > > > > > > >> >
> > > > > > > > > >> > want
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > respond differently, when watermark is lagging way
> > > > > > > > > >> >
> > > > > > > > > >> > too
> > > > > > > > > >> >
> > > > > > > > > >> > much
> > > > > > > > > >> >
> > > > > > > > > >> > behind
> > > > > > > > > >> >
> > > > > > > > > >> > processing time). Also not
> > > > > > > > > >> >
> > > > > > > > > >> > every
> > > > > > > > > >> >
> > > > > > > > > >> > stream processor in the pipeline needs to be Flink.
> > > > > > > > > >> >
> > > > > > > > > >> > It
> > > > > > > > > >> >
> > > > > > > > > >> > can
> > > > > > > > > >> >
> > > > > > > > > >> > as
> > > > > > > > > >> >
> > > > > > > > > >> > well
> > > > > > > > > >> >
> > > > > > > > > >> > be a simple element-wise transformation that reads
> > > > > > > > > >> >
> > > > > > > > > >> > from
> > > > > > > > > >> >
> > > > > > > > > >> > Kafka
> > > > > > > > > >> >
> > > > > > > > > >> > and
> > > > > > > > > >> >
> > > > > > > > > >> > writes back into separate topic (that's what we do
> > > > > > > > > >> >
> > > > > > > > > >> > for
> > > > > > > > > >> >
> > > > > > > > > >> > example
> > > > > > > > > >> >
> > > > > > > > > >> > with
> > > > > > > > > >> >
> > > > > > > > > >> > ML models, that have special hardware
> > > > > > > > > >> >
> > > > > > > > > >> > requirements).
> > > > > > > > > >> >
> > > > > > > > > >> > Best,
> > > > > > > > > >> > D.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Tue, May 18, 2021 at 8:30 AM Arvid Heise <
> > > > > > > > > >> >
> > > > > > > > > >> > ar...@apache.org>
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Eron,
> > > > > > > > > >> >
> > > > > > > > > >> > I think this is a useful addition for storage
> > > > > > > > > >> >
> > > > > > > > > >> > systems
> > > > > > > > > >> >
> > > > > > > > > >> > that
> > > > > > > > > >> >
> > > > > > > > > >> > act
> > > > > > > > > >> >
> > > > > > > > > >> > as
> > > > > > > > > >> >
> > > > > > > > > >> > pass-through for Flink to reduce recovery time.
> > > > > > > > > >> >
> > > > > > > > > >> > It
> > > > > > > > > >> >
> > > > > > > > > >> > is
> > > > > > > > > >> >
> > > > > > > > > >> > only
> > > > > > > > > >> >
> > > > > > > > > >> > useful
> > > > > > > > > >> >
> > > > > > > > > >> > if
> > > > > > > > > >> >
> > > > > > > > > >> > you
> > > > > > > > > >> >
> > > > > > > > > >> > combine it with regional fail-over as only a
> > > > > > > > > >> >
> > > > > > > > > >> > small
> > > > > > > > > >> >
> > > > > > > > > >> > part
> > > > > > > > > >> >
> > > > > > > > > >> > of
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > pipeline
> > > > > > > > > >> >
> > > > > > > > > >> > is
> > > > > > > > > >> >
> > > > > > > > > >> > restarted.
> > > > > > > > > >> >
> > > > > > > > > >> > A couple of thoughts on the implications:
> > > > > > > > > >> > 1. Watermarks are closely coupled to the used
> > > > > > > > > >> >
> > > > > > > > > >> > system
> > > > > > > > > >> >
> > > > > > > > > >> > (=Flink).
> > > > > > > > > >> >
> > > > > > > > > >> > I
> > > > > > > > > >> >
> > > > > > > > > >> > have
> > > > > > > > > >> >
> > > > > > > > > >> > a
> > > > > > > > > >> >
> > > > > > > > > >> > hard time imagining that it's useful to use a
> > > > > > > > > >> >
> > > > > > > > > >> > different
> > > > > > > > > >> >
> > > > > > > > > >> > stream
> > > > > > > > > >> >
> > > > > > > > > >> > processor
> > > > > > > > > >> >
> > > > > > > > > >> > downstream. So for now, I'm assuming that both
> > > > > > > > > >> >
> > > > > > > > > >> > upstream
> > > > > > > > > >> >
> > > > > > > > > >> > and
> > > > > > > > > >> >
> > > > > > > > > >> > downstream
> > > > > > > > > >> >
> > > > > > > > > >> > are
> > > > > > > > > >> >
> > > > > > > > > >> > Flink applications. In that case, we probably
> > > > > > > > > >> >
> > > > > > > > > >> > define
> > > > > > > > > >> >
> > > > > > > > > >> > both
> > > > > > > > > >> >
> > > > > > > > > >> > parts
> > > > > > > > > >> >
> > > > > > > > > >> > of the pipeline in the same Flink job similar to
> > > > > > > > > >> >
> > > > > > > > > >> > KafkaStream's
> > > > > > > > > >> >
> > > > > > > > > >> > #through.
> > > > > > > > > >> >
> > > > > > > > > >> > 2. The schema of the respective intermediate
> > > > > > > > > >> >
> > > > > > > > > >> > stream/topic
> > > > > > > > > >> >
> > > > > > > > > >> > would
> > > > > > > > > >> >
> > > > > > > > > >> > need
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > be
> > > > > > > > > >> >
> > > > > > > > > >> > managed by Flink to encode both records and
> > > > > > > > > >> >
> > > > > > > > > >> > watermarks.
> > > > > > > > > >> >
> > > > > > > > > >> > This
> > > > > > > > > >> >
> > > > > > > > > >> > reduces
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > usability quite a bit and needs to be carefully
> > > > > > > > > >> >
> > > > > > > > > >> > crafted.
> > > > > > > > > >> >
> > > > > > > > > >> > 3. It's not clear to me if constructs like
> > > > > > > > > >> >
> > > > > > > > > >> > SchemaRegistry
> > > > > > > > > >> >
> > > > > > > > > >> > can
> > > > > > > > > >> >
> > > > > > > > > >> > be
> > > > > > > > > >> >
> > > > > > > > > >> > properly
> > > > > > > > > >> >
> > > > > > > > > >> > supported (and also if they should be supported)
> > > > > > > > > >> >
> > > > > > > > > >> > in
> > > > > > > > > >> >
> > > > > > > > > >> > terms
> > > > > > > > > >> >
> > > > > > > > > >> > of
> > > > > > > > > >> >
> > > > > > > > > >> > schema evolution.
> > > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker
> > > > > > > > > >> >
> > > > > > > > > >> > would
> > > > > > > > > >> >
> > > > > > > > > >> > also
> > > > > > > > > >> >
> > > > > > > > > >> > need
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > be encoded.
> > > > > > > > > >> > 5. It's important to have some way to transport
> > > > > > > > > >> >
> > > > > > > > > >> > backpressure
> > > > > > > > > >> >
> > > > > > > > > >> > from
> > > > > > > > > >> >
> > > > > > > > > >> > the downstream to the upstream. Or else you would
> > > > > > > > > >> >
> > > > > > > > > >> > have
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > same
> > > > > > > > > >> >
> > > > > > > > > >> > issue as KafkaStreams where two separate
> > > > > > > > > >> >
> > > > > > > > > >> > pipelines
> > > > > > > > > >> >
> > > > > > > > > >> > can
> > > > > > > > > >> >
> > > > > > > > > >> > drift
> > > > > > > > > >> >
> > > > > > > > > >> > so
> > > > > > > > > >> >
> > > > > > > > > >> > far away that
> > > > > > > > > >> >
> > > > > > > > > >> > you
> > > > > > > > > >> >
> > > > > > > > > >> > experience data loss if the data retention period
> > > > > > > > > >> >
> > > > > > > > > >> > is
> > > > > > > > > >> >
> > > > > > > > > >> > smaller
> > > > > > > > > >> >
> > > > > > > > > >> > than
> > > > > > > > > >> >
> > > > > > > > > >> > the drift.
> > > > > > > > > >> > 6. It's clear that you trade a huge chunk of
> > > > > > > > > >> >
> > > > > > > > > >> > throughput
> > > > > > > > > >> >
> > > > > > > > > >> > for
> > > > > > > > > >> >
> > > > > > > > > >> > lower
> > > > > > > > > >> >
> > > > > > > > > >> > overall
> > > > > > > > > >> >
> > > > > > > > > >> > latency in case of failure. So it's an
> > > > > > > > > >> >
> > > > > > > > > >> > interesting
> > > > > > > > > >> >
> > > > > > > > > >> > feature
> > > > > > > > > >> >
> > > > > > > > > >> > for
> > > > > > > > > >> >
> > > > > > > > > >> > use
> > > > > > > > > >> >
> > > > > > > > > >> > cases
> > > > > > > > > >> >
> > > > > > > > > >> > with SLAs.
> > > > > > > > > >> >
> > > > > > > > > >> > Since we are phasing out SinkFunction, I'd prefer
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > only
> > > > > > > > > >> >
> > > > > > > > > >> > support
> > > > > > > > > >> >
> > > > > > > > > >> > SinkWriter. Having a no-op default sounds good to
> > > > > > > > > >> >
> > > > > > > > > >> > me.
> > > > > > > > > >> >
> > > > > > > > > >> > We have some experimental feature for Kafka [1],
> > > > > > > > > >> >
> > > > > > > > > >> > which
> > > > > > > > > >> >
> > > > > > > > > >> > pretty
> > > > > > > > > >> >
> > > > > > > > > >> > much
> > > > > > > > > >> >
> > > > > > > > > >> > reflects
> > > > > > > > > >> >
> > > > > > > > > >> > your idea. Here we have an ugly workaround to be
> > > > > > > > > >> >
> > > > > > > > > >> > able
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > process
> > > > > > > > > >> >
> > > > > > > > > >> > the watermark by using a custom StreamSink task.
> > > > > > > > > >> >
> > > > > > > > > >> > We
> > > > > > > > > >> >
> > > > > > > > > >> > could
> > > > > > > > > >> >
> > > > > > > > > >> > also
> > > > > > > > > >> >
> > > > > > > > > >> > try to
> > > > > > > > > >> >
> > > > > > > > > >> > create a
> > > > > > > > > >> >
> > > > > > > > > >> > FLIP that abstracts the actual system away and
> > > > > > > > > >> >
> > > > > > > > > >> > then
> > > > > > > > > >> >
> > > > > > > > > >> > we
> > > > > > > > > >> >
> > > > > > > > > >> > could
> > > > > > > > > >> >
> > > > > > > > > >> > use
> > > > > > > > > >> >
> > > > > > > > > >> > the approach for both Pulsar and Kafka.
> > > > > > > > > >> >
> > > > > > > > > >> > [1]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > >
> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
> > > > > > > > > >> >
> > > > > > > > > >> > [github[.]com]
> > > > > > > > > >> >
> > > > > > > > > >> > On Mon, May 17, 2021 at 10:44 PM Eron
> > > > > > > > > >> Wright<ewri...@streamnative.io.invalid> <
> > > > > ewri...@streamnative.io
> > > > > > > > > .invalid>
> > > > > > > > > >> wrote:
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > I would like to propose an enhancement to the
> > > > > > > > > >> >
> > > > > > > > > >> > Sink
> > > > > > > > > >> >
> > > > > > > > > >> > API,
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > ability
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > receive upstream watermarks.   I'm aware that
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > sink
> > > > > > > > > >> >
> > > > > > > > > >> > context
> > > > > > > > > >> >
> > > > > > > > > >> > provides
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > current watermark for a given record.  I'd like
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > be
> > > > > > > > > >> >
> > > > > > > > > >> > able
> > > > > > > > > >> >
> > > > > > > > > >> > to
> > > > > > > > > >> >
> > > > > > > > > >> > write
> > > > > > > > > >> >
> > > > > > > > > >> > a
> > > > > > > > > >> >
> > > > > > > > > >> > sink
> > > > > > > > > >> >
> > > > > > > > > >> > function that is invoked whenever the watermark
> > > > > > > > > >> >
> > > > > > > > > >> > changes.
> > > > > > > > > >> >
> > > > > > > > > >> > Out
> > > > > > > > > >> >
> > > > > > > > > >> > of
> > > > > > > > > >> >
> > > > > > > > > >> > scope
> > > > > > > > > >> >
> > > > > > > > > >> > would be event-time timers (since sinks aren't
> > > > > > > > > >> >
> > > > > > > > > >> > keyed).
> > > > > > > > > >> >
> > > > > > > > > >> > For context, imagine that a stream storage
> > > > > > > > > >> >
> > > > > > > > > >> > system
> > > > > > > > > >> >
> > > > > > > > > >> > had
> > > > > > > > > >> >
> > > > > > > > > >> > the
> > > > > > > > > >> >
> > > > > > > > > >> > ability to persist watermarks in addition to
> > > > > > > > > >> >
> > > > > > > > > >> > ordinary
> > > > > > > > > >> >
> > > > > > > > > >> > elements,
> > > > > > > > > >> >
> > > > > > > > > >> > e.g. to serve
> > > > > > > > > >> >
> > > > > > > > > >> > as
> > > > > > > > > >> >
> > > > > > > > > >> > source watermarks in a downstream processor.
> > > > > > > > > >> >
> > > > > > > > > >> > Ideally
> > > > > > > > > >> >
> > > > > > > > > >> > one
> > > > > > > > > >> >
> > > > > > > > > >> > could
> > > > > > > > > >> >
> > > > > > > > > >> > compose a
> > > > > > > > > >> >
> > > > > > > > > >> > multi-stage, event-driven application, with
> > > > > > > > > >> >
> > > > > > > > > >> > watermarks
> > > > > > > > > >> >
> > > > > > > > > >> > flowing
> > > > > > > > > >> >
> > > > > > > > > >> > end-to-end
> > > > > > > > > >> >
> > > > > > > > > >> > without need for a heuristics-based watermark
> > > > > > > > > >> >
> > > > > > > > > >> > at
> > > > > > > > > >> >
> > > > > > > > > >> > each
> > > > > > > > > >> >
> > > > > > > > > >> > stage.
> > > > > > > > > >> >
> > > > > > > > > >> > The specific proposal would be a new method on
> > > > > > > > > >> >
> > > > > > > > > >> > `SinkFunction`
> > > > > > > > > >> >
> > > > > > > > > >> > and/or
> > > > > > > > > >> >
> > > > > > > > > >> > on
> > > > > > > > > >> >
> > > > > > > > > >> > `SinkWriter`, called 'processWatermark' or
> > > > > > > > > >> >
> > > > > > > > > >> > 'writeWatermark',
> > > > > > > > > >> >
> > > > > > > > > >> > with a
> > > > > > > > > >> >
> > > > > > > > > >> > default
> > > > > > > > > >> >
> > > > > > > > > >> > implementation that does nothing.
> > > > > > > > > >> >
> > > > > > > > > >> > Thoughts?
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks!
> > > > > > > > > >> > Eron Wright
> > > > > > > > > >> > StreamNative
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > >
> > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > >
> > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > > >> >
> > > > > > > > > >> > dMtQrD25c$ [calendly[.]com]>
> > > > > > > > > >> >
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > >
> > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > > >> >
> > > > > > > > > >> > [github[.]com]>
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > >
> > > https://urldefense.com/v3/__https://www.linkedin.com/company/stream
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > >
> > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > > >> >
> > > > > > > > > >> > dMqO4UZJa$ [linkedin[.]com]>
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > >
> https://urldefense.com/v3/__https://twitter.com/streamnativeio/__
> > > > > > > > > >> >
> > > > > > > > > >> > ;!
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > >
> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > > >> >
> > > > > > > > > >> > [twitter[.]com]>
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
> > > > > > > > > >> >
> > > > > > > > > >> > rD25c$ [calendly[.]com]>
> > > > > > > > > >> >
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI
> > > > > > > > > >> >
> > > > > > > > > >> > !
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > > >> >
> > > > > > > > > >> > [github[.]com]>
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
> > > > > > > > > >> >
> > > > > > > > > >> > 4UZJa$ [linkedin[.]com]>
> > > > > > > > > >> > <
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > > >> >
> > > > > > > > > >> > [twitter[.]com]>
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > https://github.com/streamnative
> > > > > > > > ><
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > https://github.com/streamnative
> > > > > > > > ><
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > https://github.com/streamnative
> > > > > > > > ><
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > https://github.com/streamnative
> > > > > > > > ><
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > https://github.com/streamnative
> > > > > > > > ><
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to