I think the rationale for end-to-end idleness (i.e. between pipelines) is
the same as the rationale for idleness between operators within a
pipeline.   On the 'main issue' you mentioned, we entrust the source with
adapting to Flink's notion of idleness (e.g. in Pulsar source, it means
that no topics/partitions are assigned to a given sub-task); a similar
adaption would occur in the sink.  In other words, I think it reasonable
that a sink for a watermark-aware storage system has need for the idleness
signal.

Let me explain how I would use it in Pulsar's sink.  Each sub-task is a
Pulsar producer, and is writing watermarks to a configured topic via the
Producer API.  The Pulsar broker aggregates the watermarks that are written
by each producer into a global minimum (similar to StatusWatermarkValve).
The broker keeps track of which producers are actively producing
watermarks, and a producer may mark itself as idle to tell the broker not
to wait for watermarks from it, e.g. when a producer is going offline.  I
had intended to mark the producer as idle when the sub-task is closing, but
now I see that it would be insufficient; the producer should also be idled
if the sub-task is idled.  Otherwise, the broker would wait indefinitely
for the idled sub-task to produce a watermark.

Arvid, I think your original instincts were correct about idleness
propagation, and I hope I've demonstrated a practical use case.



On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise <ar...@apache.org> wrote:

> When I was rethinking the idleness issue, I came to the conclusion that it
> should be inferred at the source of the respective downstream pipeline
> again.
>
> The main issue on propagating idleness is that you would force the same
> definition across all downstream pipelines, which may not be what the user
> intended.
> On the other hand, I don't immediately see a technical reason why the
> downstream source wouldn't be able to infer that.
>
>
> On Thu, Jun 3, 2021 at 9:14 PM Eron Wright <ewri...@streamnative.io
> .invalid>
> wrote:
>
> > Thanks Piotr for bringing this up.  I reflected on this and I agree we
> > should expose idleness, otherwise a multi-stage flow could stall.
> >
> > Regarding the latency markers, I don't see an immediate need for
> > propagating them, because they serve to estimate latency within a
> pipeline,
> > not across pipelines.  One would probably need to enhance the source
> > interface also to do e2e latency.  Seems we agree this aspect is out of
> > scope.
> >
> > I took a look at the code to get a sense of how to accomplish this.  The
> > gist is a new `markIdle` method on the `StreamOperator` interface, that
> is
> > called when the stream status maintainer (the `OperatorChain`)
> transitions
> > to idle state.  Then, a new `markIdle` method on the `SinkFunction` and
> > `SinkWriter` that is called by the respective operators.   Note that
> > StreamStatus is an internal class.
> >
> > Here's a draft PR (based on the existing PR of FLINK-22700) to highlight
> > this new aspect:
> > https://github.com/streamnative/flink/pull/2/files
> >
> > Please let me know if you'd like me to proceed to update the FLIP with
> > these details.
> >
> > Thanks again,
> > Eron
> >
> > On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi,
> > >
> > > Sorry for chipping in late in the discussion, but I would second this
> > point
> > > from Arvid:
> > >
> > > > 4. Potentially, StreamStatus and LatencyMarker would also need to be
> > > encoded.
> > >
> > > It seems like this point was asked, but not followed? Or did I miss it?
> > > Especially the StreamStatus part. For me it sounds like exposing
> > watermarks
> > > without letting the sink know that the stream can be idle is an
> > incomplete
> > > feature and can be very problematic/confusing for potential users.
> > >
> > > Best,
> > > Piotrek
> > >
> > > pon., 31 maj 2021 o 08:34 Arvid Heise <ar...@apache.org> napisał(a):
> > >
> > > > Afaik everyone can start a [VOTE] thread [1]. For example, here a
> > > > non-committer started a successful thread [2].
> > > > If you start it, I can already cast a binding vote and we just need 2
> > > more
> > > > for the FLIP to be accepted.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting
> > > > [2]
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html
> > > >
> > > > On Fri, May 28, 2021 at 8:17 PM Eron Wright <ewri...@streamnative.io
> > > > .invalid>
> > > > wrote:
> > > >
> > > > > Arvid,
> > > > > Thanks for the feedback.  I investigated the japicmp configuration,
> > > and I
> > > > > see that SinkWriter is marked Experimental (not Public or
> > > > PublicEvolving).
> > > > > I think this means that SinkWriter need not be excluded.  As you
> > > > mentioned,
> > > > > SinkFunction is already excluded.  I've updated the FLIP with an
> > > > > explanation.
> > > > >
> > > > > I believe all issues are resolved.  May we proceed to a vote now?
> > And
> > > > are
> > > > > you able to drive the vote process?
> > > > >
> > > > > Thanks,
> > > > > Eron
> > > > >
> > > > >
> > > > > On Fri, May 28, 2021 at 4:40 AM Arvid Heise <ar...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi Eron,
> > > > > >
> > > > > > 1. fair point. It still feels odd to have writeWatermark in the
> > > > > > SinkFunction (it's supposed to be functional as you mentioned),
> > but I
> > > > > agree
> > > > > > that invokeWatermark is not better. So unless someone has a
> better
> > > > idea,
> > > > > > I'm fine with it.
> > > > > > 2.+3. I tried to come up with scenarios for a longer time. In
> > > general,
> > > > it
> > > > > > seems as if the new SinkWriter interface encourages more
> injection
> > > (see
> > > > > > processing time service in InitContext), such that the need for
> the
> > > > > context
> > > > > > is really just context information of that particular record and
> I
> > > > don't
> > > > > > see any use beyond timestamp and watermark. For SinkFunction, I'd
> > not
> > > > > > over-engineer as it's going to be deprecated soonish. So +1 to
> > leave
> > > it
> > > > > > out.
> > > > > > 4. Okay so I double-checked: from an execution perspective, it
> > works.
> > > > > > However, japicmp would definitely complain. I propose to add it
> to
> > > the
> > > > > > compatibility section like this. We need to add an exception to
> > > > > SinkWriter
> > > > > > then. (SinkFunction is already on the exception list)
> > > > > > 5.+6. Awesome, I was also sure but wanted to double check.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Arvid
> > > > > >
> > > > > >
> > > > > > On Wed, May 26, 2021 at 7:29 PM Eron Wright <
> > ewri...@streamnative.io
> > > > > > .invalid>
> > > > > > wrote:
> > > > > >
> > > > > > > Arvid,
> > > > > > >
> > > > > > > 1. I assume that the method name `invoke` stems from
> considering
> > > the
> > > > > > > SinkFunction to be a functional interface, but is otherwise
> > > > > meaningless.
> > > > > > > Keeping it as `writeWatermark` does keep it symmetric with
> > > > SinkWriter.
> > > > > > My
> > > > > > > vote is to leave it.  You decide.
> > > > > > >
> > > > > > > 2+3. I too considered adding a `WatermarkContext`, but it would
> > > > merely
> > > > > > be a
> > > > > > > placeholder.  I don't anticipate any context info in future.
> As
> > we
> > > > see
> > > > > > > with invoke, it is possible to add a context later in a
> > > > > > > backwards-compatible way.  My vote is to not introduce a
> context.
> > > > You
> > > > > > > decide.
> > > > > > >
> > > > > > > 4. No anticipated compatibility issues.
> > > > > > >
> > > > > > > 5. Short answer, it works as expected.  The new methods are
> > invoked
> > > > > > > whenever the underlying operator receives a watermark.  I do
> > > believe
> > > > > that
> > > > > > > batch and ingestion time applications receive watermarks. Seems
> > the
> > > > > > > programming model is more unified in that respect since 1.12
> > > > > (FLIP-134).
> > > > > > >
> > > > > > > 6. The failure behavior is the same as for elements.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Eron
> > > > > > >
> > > > > > > On Tue, May 25, 2021 at 12:42 PM Arvid Heise <ar...@apache.org
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Eron,
> > > > > > > >
> > > > > > > > I think the FLIP is crisp and mostly good to go. Some smaller
> > > > > > > > things/questions:
> > > > > > > >
> > > > > > > >    1. SinkFunction#writeWatermark could be named
> > > > > > > >    SinkFunction#invokeWatermark or invokeOnWatermark to keep
> it
> > > > > > > symmetric.
> > > > > > > >    2. We could add the context parameter to both. For
> > > > > > SinkWriter#Context,
> > > > > > > >    we currently do not gain much. SinkFunction#Context also
> > > exposes
> > > > > > > > processing
> > > > > > > >    time, which may or may not be handy and is currently
> mostly
> > > used
> > > > > for
> > > > > > > >    StreamingFileSink bucket policies. We may add that
> > processing
> > > > time
> > > > > > > flag
> > > > > > > >    also to SinkWriter#Context in the future.
> > > > > > > >    3. Alternatively, we could also add a different context
> > > > parameter
> > > > > > just
> > > > > > > >    to keep the API stable while allowing additional
> information
> > > to
> > > > be
> > > > > > > > passed
> > > > > > > >    in the future.
> > > > > > > >    4. Would we run into any compatibility issue if we use
> Flink
> > > > 1.13
> > > > > > > source
> > > > > > > >    in Flink 1.14 (with this FLIP) or vice versa?
> > > > > > > >    5. What happens with sinks that use the new methods in
> > > > > applications
> > > > > > > that
> > > > > > > >    do not have watermarks (batch mode, processing time)? Does
> > > this
> > > > > also
> > > > > > > > work
> > > > > > > >    with ingestion time sufficiently?
> > > > > > > >    6. How do exactly once sinks deal with written watermarks
> in
> > > > case
> > > > > of
> > > > > > > >    failure? I guess it's the same as normal records. (Either
> > > > rollback
> > > > > > of
> > > > > > > >    transaction or deduplication on resumption)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Arvid
> > > > > > > >
> > > > > > > > On Tue, May 25, 2021 at 6:44 PM Eron Wright <
> > > > ewri...@streamnative.io
> > > > > > > > .invalid>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Does anyone have further comment on FLIP-167?
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Eron
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, May 20, 2021 at 5:02 PM Eron Wright <
> > > > > ewri...@streamnative.io
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Filed FLIP-167: Watermarks for Sink API:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > > >
> > > > > > > > > > I'd like to call a vote next week, is that reasonable?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <
> > b.z...@dell.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi Arvid and Eron,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the discussion and I read through Eron's pull
> > > > request
> > > > > > > and I
> > > > > > > > > >> think this can benefit Pravega Flink connector as well.
> > > > > > > > > >>
> > > > > > > > > >> Here is some background. Pravega had the watermark
> concept
> > > > > through
> > > > > > > the
> > > > > > > > > >> event stream since two years ago, and here is a blog
> > > > > > introduction[1]
> > > > > > > > for
> > > > > > > > > >> Pravega watermark.
> > > > > > > > > >> Pravega Flink connector also had this watermark
> > integration
> > > > last
> > > > > > > year
> > > > > > > > > >> that we wanted to propagate the Flink watermark to
> Pravega
> > > in
> > > > > the
> > > > > > > > > >> SinkFunction, and at that time we just used the existing
> > > Flink
> > > > > API
> > > > > > > > that
> > > > > > > > > we
> > > > > > > > > >> keep the last watermark in memory and check if watermark
> > > > changes
> > > > > > for
> > > > > > > > > each
> > > > > > > > > >> event[2] which is not efficient. With such new
> interface,
> > we
> > > > can
> > > > > > > also
> > > > > > > > > >> manage the watermark propagation much more easily.
> > > > > > > > > >>
> > > > > > > > > >> [1]
> > > > > > >
> https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> > > > > > > > > >> [2]
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> > > > > > > > > >>
> > > > > > > > > >> -----Original Message-----
> > > > > > > > > >> From: Arvid Heise <ar...@apache.org>
> > > > > > > > > >> Sent: Wednesday, May 19, 2021 16:06
> > > > > > > > > >> To: dev
> > > > > > > > > >> Subject: Re: [DISCUSS] Watermark propagation with Sink
> API
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> [EXTERNAL EMAIL]
> > > > > > > > > >>
> > > > > > > > > >> Hi Eron,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for pushing that topic. I can now see that the
> > > benefit
> > > > is
> > > > > > > even
> > > > > > > > > >> bigger than I initially thought. So it's worthwhile
> > anyways
> > > to
> > > > > > > include
> > > > > > > > > that.
> > > > > > > > > >>
> > > > > > > > > >> I also briefly thought about exposing watermarks to all
> > > UDFs,
> > > > > but
> > > > > > > > here I
> > > > > > > > > >> really have an issue to see specific use cases. Could
> you
> > > > maybe
> > > > > > > take a
> > > > > > > > > few
> > > > > > > > > >> minutes to think about it as well? I could only see
> > someone
> > > > > > misusing
> > > > > > > > > Async
> > > > > > > > > >> IO as a sink where a real sink would be more
> appropriate.
> > In
> > > > > > > general,
> > > > > > > > if
> > > > > > > > > >> there is not a clear use case, we shouldn't add the
> > > > > functionality
> > > > > > as
> > > > > > > > > it's
> > > > > > > > > >> just increased maintenance for no value.
> > > > > > > > > >>
> > > > > > > > > >> If we stick to the plan, I think your PR is already in a
> > > good
> > > > > > shape.
> > > > > > > > We
> > > > > > > > > >> need to create a FLIP for it though, since it changes
> > Public
> > > > > > > > interfaces
> > > > > > > > > >> [1]. I was initially not convinced that we should also
> > > change
> > > > > the
> > > > > > > old
> > > > > > > > > >> SinkFunction interface, but seeing how little the change
> > > is, I
> > > > > > > > wouldn't
> > > > > > > > > >> mind at all to increase consistency. Only when we wrote
> > the
> > > > FLIP
> > > > > > and
> > > > > > > > > >> approved it (which should be minimal and fast), we
> should
> > > > > actually
> > > > > > > > look
> > > > > > > > > at
> > > > > > > > > >> the PR ;).
> > > > > > > > > >>
> > > > > > > > > >> The only thing which I would improve is the name of the
> > > > > function.
> > > > > > > > > >> processWatermark sounds as if the sink implementer
> really
> > > > needs
> > > > > to
> > > > > > > > > >> implement it (as you would need to do it on a custom
> > > > operator).
> > > > > I
> > > > > > > > would
> > > > > > > > > >> make them symmetric to the record writing/invoking
> method
> > > > (e.g.
> > > > > > > > > >> writeWatermark and invokeWatermark).
> > > > > > > > > >>
> > > > > > > > > >> As a follow-up PR, we should then migrate KafkaShuffle
> to
> > > the
> > > > > new
> > > > > > > API.
> > > > > > > > > >> But that's something I can do.
> > > > > > > > > >>
> > > > > > > > > >> [1]
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> > > > > > > > > >> [cwiki[.]apache[.]org]
> > > > > > > > > >>
> > > > > > > > > >> On Wed, May 19, 2021 at 3:34 AM Eron Wright <
> > > > > > > ewri...@streamnative.io
> > > > > > > > > >> .invalid>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Update: opened an issue and a PR.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> > > > > > > > > >> > plbgRO4$ [issues[.]apache[.]org]
> > > > > > > > > >> >
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> > > > > > > > > >> > $ [github[.]com]
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <
> > > > > > > > ewri...@streamnative.io
> > > > > > > > > >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Thanks Arvid and David for sharing your ideas on
> this
> > > > > subject.
> > > > > > > > I'm
> > > > > > > > > >> > > glad to hear that you're seeing use cases for
> > watermark
> > > > > > > > propagation
> > > > > > > > > >> > > via an enhanced sink interface.
> > > > > > > > > >> > >
> > > > > > > > > >> > > As you've guessed, my interest is in Pulsar and am
> > > > exploring
> > > > > > > some
> > > > > > > > > >> > > options for brokering watermarks across stream
> > > processing
> > > > > > > > pipelines.
> > > > > > > > > >> > > I think
> > > > > > > > > >> > Arvid
> > > > > > > > > >> > > is speaking to a high-fidelity solution where the
> > > > difference
> > > > > > > > between
> > > > > > > > > >> > intra-
> > > > > > > > > >> > > and inter-pipeline flow is eliminated.  My goal is
> > more
> > > > > > > limited; I
> > > > > > > > > >> > > want
> > > > > > > > > >> > to
> > > > > > > > > >> > > write the watermark that arrives at the sink to
> > Pulsar.
> > > > > > Simply
> > > > > > > > > >> > > imagine that Pulsar has native support for
> > watermarking
> > > in
> > > > > its
> > > > > > > > > >> > > producer/consumer API, and we'll leave the details
> to
> > > > > another
> > > > > > > > forum.
> > > > > > > > > >> > >
> > > > > > > > > >> > > David, I like your invariant.  I see lateness as
> > > stemming
> > > > > from
> > > > > > > the
> > > > > > > > > >> > problem
> > > > > > > > > >> > > domain and from system dynamics (e.g. scheduling,
> > > > batching,
> > > > > > > lag).
> > > > > > > > > >> > > When
> > > > > > > > > >> > one
> > > > > > > > > >> > > depends on order-of-observation to generate
> > watermarks,
> > > > the
> > > > > > app
> > > > > > > > may
> > > > > > > > > >> > become
> > > > > > > > > >> > > unduly sensitive to dynamics which bear on
> > > > > > order-of-observation.
> > > > > > > > My
> > > > > > > > > >> > > goal is to factor out the system dynamics from
> > lateness
> > > > > > > > > determination.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Arvid, to be most valuable (at least for my
> purposes)
> > > the
> > > > > > > > > >> > > enhancement is needed on SinkFunction.  This will
> > allow
> > > us
> > > > > to
> > > > > > > > easily
> > > > > > > > > >> > > evolve the existing Pulsar connector.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Next step, I will open a PR to advance the
> > conversation.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Eron
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Tue, May 18, 2021 at 5:06 AM David Morávek
> > > > > > > > > >> > > <david.mora...@gmail.com>
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > >> Hi Eron,
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> Thanks for starting this discussion. I've been
> > thinking
> > > > > about
> > > > > > > > this
> > > > > > > > > >> > >> recently as we've run into "watermark related"
> > issues,
> > > > when
> > > > > > > > > >> > >> chaining multiple pipelines together. My to cents
> to
> > > the
> > > > > > > > > >> > >> discussion:
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> How I like to think about the problem, is that
> there
> > > > should
> > > > > > an
> > > > > > > > > >> > >> invariant that holds for any stream processing
> > > pipeline:
> > > > > > > > "NON_LATE
> > > > > > > > > >> > >> element
> > > > > > > > > >> > entering
> > > > > > > > > >> > >> the system, should never become LATE"
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> Unfortunately this is exactly what happens in
> > > downstream
> > > > > > > > pipelines,
> > > > > > > > > >> > >> because the upstream one can:
> > > > > > > > > >> > >> - break ordering (especially with higher
> parallelism)
> > > > > > > > > >> > >> - emit elements that are ahead of output watermark
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> There is not enough information to re-construct
> > > upstream
> > > > > > > > watermark
> > > > > > > > > >> > >> in latter stages (it's always just an estimate
> based
> > on
> > > > > > > previous
> > > > > > > > > >> > >> pipeline's output).
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> It would be great, if we could have a general
> > > > abstraction,
> > > > > > that
> > > > > > > > is
> > > > > > > > > >> > >> reusable for various sources / sinks (not just
> Kafka
> > /
> > > > > > Pulsar,
> > > > > > > > > >> > >> thought this would probably cover most of the
> > > use-cases)
> > > > > and
> > > > > > > > > >> > >> systems.
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> Is there any other use-case then sharing watermark
> > > > between
> > > > > > > > > >> > >> pipelines,
> > > > > > > > > >> > that
> > > > > > > > > >> > >> you're trying to solve?
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> Arvid:
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> 1. Watermarks are closely coupled to the used
> system
> > > > > > (=Flink).
> > > > > > > I
> > > > > > > > > >> > >> have a
> > > > > > > > > >> > >> > hard time imagining that it's useful to use a
> > > different
> > > > > > > stream
> > > > > > > > > >> > processor
> > > > > > > > > >> > >> > downstream. So for now, I'm assuming that both
> > > upstream
> > > > > and
> > > > > > > > > >> > >> > downstream
> > > > > > > > > >> > >> are
> > > > > > > > > >> > >> > Flink applications. In that case, we probably
> > define
> > > > both
> > > > > > > parts
> > > > > > > > > >> > >> > of the pipeline in the same Flink job similar to
> > > > > > > KafkaStream's
> > > > > > > > > >> #through.
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> I'd slightly disagree here. For example we're
> > > > > "materializing"
> > > > > > > > > >> > change-logs
> > > > > > > > > >> > >> produced by Flink pipeline into serving layer
> (random
> > > > > access
> > > > > > > db /
> > > > > > > > > >> > >> in memory view / ..) and we need to know, whether
> > > > responses
> > > > > > we
> > > > > > > > > >> > >> serve meet the "freshness" requirements (eg. you
> may
> > > want
> > > > > to
> > > > > > > > > >> > >> respond differently, when watermark is lagging way
> > too
> > > > much
> > > > > > > > behind
> > > > > > > > > >> > >> processing time). Also not
> > > > > > > > > >> > every
> > > > > > > > > >> > >> stream processor in the pipeline needs to be Flink.
> > It
> > > > can
> > > > > as
> > > > > > > > well
> > > > > > > > > >> > >> be a simple element-wise transformation that reads
> > from
> > > > > Kafka
> > > > > > > and
> > > > > > > > > >> > >> writes back into separate topic (that's what we do
> > for
> > > > > > example
> > > > > > > > with
> > > > > > > > > >> > >> ML models, that have special hardware
> requirements).
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> Best,
> > > > > > > > > >> > >> D.
> > > > > > > > > >> > >>
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <
> > > > > > ar...@apache.org>
> > > > > > > > > >> wrote:
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> > Hi Eron,
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > I think this is a useful addition for storage
> > systems
> > > > > that
> > > > > > > act
> > > > > > > > as
> > > > > > > > > >> > >> > pass-through for Flink to reduce recovery time.
> It
> > is
> > > > > only
> > > > > > > > useful
> > > > > > > > > >> > >> > if
> > > > > > > > > >> > you
> > > > > > > > > >> > >> > combine it with regional fail-over as only a
> small
> > > part
> > > > > of
> > > > > > > the
> > > > > > > > > >> > pipeline
> > > > > > > > > >> > >> is
> > > > > > > > > >> > >> > restarted.
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > A couple of thoughts on the implications:
> > > > > > > > > >> > >> > 1. Watermarks are closely coupled to the used
> > system
> > > > > > > (=Flink).
> > > > > > > > I
> > > > > > > > > >> > >> > have
> > > > > > > > > >> > a
> > > > > > > > > >> > >> > hard time imagining that it's useful to use a
> > > different
> > > > > > > stream
> > > > > > > > > >> > processor
> > > > > > > > > >> > >> > downstream. So for now, I'm assuming that both
> > > upstream
> > > > > and
> > > > > > > > > >> > >> > downstream
> > > > > > > > > >> > >> are
> > > > > > > > > >> > >> > Flink applications. In that case, we probably
> > define
> > > > both
> > > > > > > parts
> > > > > > > > > >> > >> > of the pipeline in the same Flink job similar to
> > > > > > > KafkaStream's
> > > > > > > > > >> #through.
> > > > > > > > > >> > >> > 2. The schema of the respective intermediate
> > > > stream/topic
> > > > > > > would
> > > > > > > > > >> > >> > need
> > > > > > > > > >> > to
> > > > > > > > > >> > >> be
> > > > > > > > > >> > >> > managed by Flink to encode both records and
> > > watermarks.
> > > > > > This
> > > > > > > > > >> > >> > reduces
> > > > > > > > > >> > the
> > > > > > > > > >> > >> > usability quite a bit and needs to be carefully
> > > > crafted.
> > > > > > > > > >> > >> > 3. It's not clear to me if constructs like
> > > > SchemaRegistry
> > > > > > can
> > > > > > > > be
> > > > > > > > > >> > >> properly
> > > > > > > > > >> > >> > supported (and also if they should be supported)
> in
> > > > terms
> > > > > > of
> > > > > > > > > >> > >> > schema evolution.
> > > > > > > > > >> > >> > 4. Potentially, StreamStatus and LatencyMarker
> > would
> > > > also
> > > > > > > need
> > > > > > > > to
> > > > > > > > > >> > >> > be encoded.
> > > > > > > > > >> > >> > 5. It's important to have some way to transport
> > > > > > backpressure
> > > > > > > > from
> > > > > > > > > >> > >> > the downstream to the upstream. Or else you would
> > > have
> > > > > the
> > > > > > > same
> > > > > > > > > >> > >> > issue as KafkaStreams where two separate
> pipelines
> > > can
> > > > > > drift
> > > > > > > so
> > > > > > > > > >> > >> > far away that
> > > > > > > > > >> > you
> > > > > > > > > >> > >> > experience data loss if the data retention period
> > is
> > > > > > smaller
> > > > > > > > than
> > > > > > > > > >> > >> > the drift.
> > > > > > > > > >> > >> > 6. It's clear that you trade a huge chunk of
> > > throughput
> > > > > for
> > > > > > > > lower
> > > > > > > > > >> > >> overall
> > > > > > > > > >> > >> > latency in case of failure. So it's an
> interesting
> > > > > feature
> > > > > > > for
> > > > > > > > > >> > >> > use
> > > > > > > > > >> > cases
> > > > > > > > > >> > >> > with SLAs.
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > Since we are phasing out SinkFunction, I'd prefer
> > to
> > > > only
> > > > > > > > support
> > > > > > > > > >> > >> > SinkWriter. Having a no-op default sounds good to
> > me.
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > We have some experimental feature for Kafka [1],
> > > which
> > > > > > pretty
> > > > > > > > > >> > >> > much
> > > > > > > > > >> > >> reflects
> > > > > > > > > >> > >> > your idea. Here we have an ugly workaround to be
> > able
> > > > to
> > > > > > > > process
> > > > > > > > > >> > >> > the watermark by using a custom StreamSink task.
> We
> > > > could
> > > > > > > also
> > > > > > > > > >> > >> > try to
> > > > > > > > > >> > >> create a
> > > > > > > > > >> > >> > FLIP that abstracts the actual system away and
> then
> > > we
> > > > > > could
> > > > > > > > use
> > > > > > > > > >> > >> > the approach for both Pulsar and Kafka.
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > [1]
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >>
> > > > > > > > > >> >
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
> > > > > > > > > >> >
> > > > > > >
> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
> > > > > > > > > >> > [github[.]com]
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> > > > > > > > > >> > >> > <ewri...@streamnative.io.invalid> wrote:
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > > I would like to propose an enhancement to the
> > Sink
> > > > API,
> > > > > > the
> > > > > > > > > >> > >> > > ability
> > > > > > > > > >> > to
> > > > > > > > > >> > >> > > receive upstream watermarks.   I'm aware that
> the
> > > > sink
> > > > > > > > context
> > > > > > > > > >> > >> provides
> > > > > > > > > >> > >> > the
> > > > > > > > > >> > >> > > current watermark for a given record.  I'd like
> > to
> > > be
> > > > > > able
> > > > > > > to
> > > > > > > > > >> > >> > > write
> > > > > > > > > >> > a
> > > > > > > > > >> > >> > sink
> > > > > > > > > >> > >> > > function that is invoked whenever the watermark
> > > > > changes.
> > > > > > > Out
> > > > > > > > > >> > >> > > of
> > > > > > > > > >> > scope
> > > > > > > > > >> > >> > > would be event-time timers (since sinks aren't
> > > > keyed).
> > > > > > > > > >> > >> > >
> > > > > > > > > >> > >> > > For context, imagine that a stream storage
> system
> > > had
> > > > > the
> > > > > > > > > >> > >> > > ability to persist watermarks in addition to
> > > ordinary
> > > > > > > > elements,
> > > > > > > > > >> > >> > > e.g. to serve
> > > > > > > > > >> > as
> > > > > > > > > >> > >> > > source watermarks in a downstream processor.
> > > Ideally
> > > > > one
> > > > > > > > could
> > > > > > > > > >> > >> compose a
> > > > > > > > > >> > >> > > multi-stage, event-driven application, with
> > > > watermarks
> > > > > > > > flowing
> > > > > > > > > >> > >> end-to-end
> > > > > > > > > >> > >> > > without need for a heuristics-based watermark
> at
> > > each
> > > > > > > stage.
> > > > > > > > > >> > >> > >
> > > > > > > > > >> > >> > > The specific proposal would be a new method on
> > > > > > > `SinkFunction`
> > > > > > > > > >> > >> > > and/or
> > > > > > > > > >> > >> on
> > > > > > > > > >> > >> > > `SinkWriter`, called 'processWatermark' or
> > > > > > > 'writeWatermark',
> > > > > > > > > >> > >> > > with a
> > > > > > > > > >> > >> > default
> > > > > > > > > >> > >> > > implementation that does nothing.
> > > > > > > > > >> > >> > >
> > > > > > > > > >> > >> > > Thoughts?
> > > > > > > > > >> > >> > >
> > > > > > > > > >> > >> > > Thanks!
> > > > > > > > > >> > >> > > Eron Wright
> > > > > > > > > >> > >> > > StreamNative
> > > > > > > > > >> > >> > >
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >>
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > --
> > > > > > > > > >> > >
> > > > > > > > > >> > > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> > >
> > > > > > > > > >> > > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> > >
> > > > > > > > > >> > > streamnative.io |  Meet with me
> > > > > > > > > >> > > <
> > > > > > > > >
> > > > >
> https://urldefense.com/v3/__https://calendly.com/eronwright/regular
> > > > > > > > > >> > >
> > > > > > > >
> > > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > > >> > > dMtQrD25c$ [calendly[.]com]>
> > > > > > > > > >> > >
> > > > > > > > > >> > > <
> > > > > > > > >
> > > > >
> https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
> > > > > > > > > >> > >
> > > > > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > > >> > > [github[.]com]>
> > > > > > > > > >> > > <
> > > > > > > > >
> > > > >
> https://urldefense.com/v3/__https://www.linkedin.com/company/stream
> > > > > > > > > >> > >
> > > > > > > >
> > > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > > >> > > dMqO4UZJa$ [linkedin[.]com]>
> > > > > > > > > >> > > <
> > > > > > > >
> > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__
> > > > > > > > > ;!
> > > > > > > > > >> > >
> > > > > > > >
> > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > > >> > > [twitter[.]com]>
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > --
> > > > > > > > > >> >
> > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >> >
> > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >> >
> > > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > > >> > <
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
> > > > > > > > > >> > rD25c$ [calendly[.]com]>
> > > > > > > > > >> >
> > > > > > > > > >> > <
> > > > > > > >
> > > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI
> > > > > > > > > !
> > > > > > > > > >> >
> > > > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > > >> > [github[.]com]>
> > > > > > > > > >> > <
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
> > > > > > > > > >> > 4UZJa$ [linkedin[.]com]>
> > > > > > > > > >> > <
> > > > > > > > >
> > > > > >
> > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
> > > > > > > > > >> >
> > > > > > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > > >> > [twitter[.]com]>
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > >
> > > > > > > > > > Eron Wright   Cloud Engineering Lead
> > > > > > > > > >
> > > > > > > > > > p: +1 425 922 8617 <18163542939>
> > > > > > > > > >
> > > > > > > > > > streamnative.io |  Meet with me
> > > > > > > > > > <https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > >
> > > > > > > > > > <https://github.com/streamnative>
> > > > > > > > > > <https://www.linkedin.com/company/streamnative/>
> > > > > > > > > > <https://twitter.com/streamnativeio/>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Eron Wright   Cloud Engineering Lead
> > > > > > > > >
> > > > > > > > > p: +1 425 922 8617 <18163542939>
> > > > > > > > >
> > > > > > > > > streamnative.io |  Meet with me
> > > > > > > > > <https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > >
> > > > > > > > > <https://github.com/streamnative>
> > > > > > > > > <https://www.linkedin.com/company/streamnative/>
> > > > > > > > > <https://twitter.com/streamnativeio/>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Eron Wright   Cloud Engineering Lead
> > > > > > >
> > > > > > > p: +1 425 922 8617 <18163542939>
> > > > > > >
> > > > > > > streamnative.io |  Meet with me
> > > > > > > <https://calendly.com/eronwright/regular-1-hour>
> > > > > > >
> > > > > > > <https://github.com/streamnative>
> > > > > > > <https://www.linkedin.com/company/streamnative/>
> > > > > > > <https://twitter.com/streamnativeio/>
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Eron Wright   Cloud Engineering Lead
> > > > >
> > > > > p: +1 425 922 8617 <18163542939>
> > > > >
> > > > > streamnative.io |  Meet with me
> > > > > <https://calendly.com/eronwright/regular-1-hour>
> > > > >
> > > > > <https://github.com/streamnative>
> > > > > <https://www.linkedin.com/company/streamnative/>
> > > > > <https://twitter.com/streamnativeio/>
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Eron Wright   Cloud Engineering Lead
> >
> > p: +1 425 922 8617 <18163542939>
> >
> > streamnative.io |  Meet with me
> > <https://calendly.com/eronwright/regular-1-hour>
> >
> > <https://github.com/streamnative>
> > <https://www.linkedin.com/company/streamnative/>
> > <https://twitter.com/streamnativeio/>
> >
>


-- 

Eron Wright   Cloud Engineering Lead

p: +1 425 922 8617 <18163542939>

streamnative.io |  Meet with me
<https://calendly.com/eronwright/regular-1-hour>

<https://github.com/streamnative>
<https://www.linkedin.com/company/streamnative/>
<https://twitter.com/streamnativeio/>

Reply via email to