Arvid,

1. I assume that the method name `invoke` stems from considering the
SinkFunction to be a functional interface, but is otherwise meaningless.
Keeping it as `writeWatermark` does keep it symmetric with SinkWriter.  My
vote is to leave it.  You decide.

2+3. I too considered adding a `WatermarkContext`, but it would merely be a
placeholder.  I don't anticipate any context info in future.  As we see
with invoke, it is possible to add a context later in a
backwards-compatible way.  My vote is to not introduce a context.  You
decide.

4. No anticipated compatibility issues.

5. Short answer, it works as expected.  The new methods are invoked
whenever the underlying operator receives a watermark.  I do believe that
batch and ingestion time applications receive watermarks. Seems the
programming model is more unified in that respect since 1.12 (FLIP-134).

6. The failure behavior is the same as for elements.

Thanks,
Eron

On Tue, May 25, 2021 at 12:42 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Eron,
>
> I think the FLIP is crisp and mostly good to go. Some smaller
> things/questions:
>
>    1. SinkFunction#writeWatermark could be named
>    SinkFunction#invokeWatermark or invokeOnWatermark to keep it symmetric.
>    2. We could add the context parameter to both. For SinkWriter#Context,
>    we currently do not gain much. SinkFunction#Context also exposes
> processing
>    time, which may or may not be handy and is currently mostly used for
>    StreamingFileSink bucket policies. We may add that processing time flag
>    also to SinkWriter#Context in the future.
>    3. Alternatively, we could also add a different context parameter just
>    to keep the API stable while allowing additional information to be
> passed
>    in the future.
>    4. Would we run into any compatibility issue if we use Flink 1.13 source
>    in Flink 1.14 (with this FLIP) or vice versa?
>    5. What happens with sinks that use the new methods in applications that
>    do not have watermarks (batch mode, processing time)? Does this also
> work
>    with ingestion time sufficiently?
>    6. How do exactly once sinks deal with written watermarks in case of
>    failure? I guess it's the same as normal records. (Either rollback of
>    transaction or deduplication on resumption)
>
> Best,
>
> Arvid
>
> On Tue, May 25, 2021 at 6:44 PM Eron Wright <ewri...@streamnative.io
> .invalid>
> wrote:
>
> > Does anyone have further comment on FLIP-167?
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> >
> > Thanks,
> > Eron
> >
> >
> > On Thu, May 20, 2021 at 5:02 PM Eron Wright <ewri...@streamnative.io>
> > wrote:
> >
> > > Filed FLIP-167: Watermarks for Sink API:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > >
> > > I'd like to call a vote next week, is that reasonable?
> > >
> > >
> > > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <b.z...@dell.com> wrote:
> > >
> > >> Hi Arvid and Eron,
> > >>
> > >> Thanks for the discussion and I read through Eron's pull request and I
> > >> think this can benefit Pravega Flink connector as well.
> > >>
> > >> Here is some background. Pravega had the watermark concept through the
> > >> event stream since two years ago, and here is a blog introduction[1]
> for
> > >> Pravega watermark.
> > >> Pravega Flink connector also had this watermark integration last year
> > >> that we wanted to propagate the Flink watermark to Pravega in the
> > >> SinkFunction, and at that time we just used the existing Flink API
> that
> > we
> > >> keep the last watermark in memory and check if watermark changes for
> > each
> > >> event[2] which is not efficient. With such new interface, we can also
> > >> manage the watermark propagation much more easily.
> > >>
> > >> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> > >> [2]
> > >>
> >
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> > >>
> > >> -----Original Message-----
> > >> From: Arvid Heise <ar...@apache.org>
> > >> Sent: Wednesday, May 19, 2021 16:06
> > >> To: dev
> > >> Subject: Re: [DISCUSS] Watermark propagation with Sink API
> > >>
> > >>
> > >> [EXTERNAL EMAIL]
> > >>
> > >> Hi Eron,
> > >>
> > >> Thanks for pushing that topic. I can now see that the benefit is even
> > >> bigger than I initially thought. So it's worthwhile anyways to include
> > that.
> > >>
> > >> I also briefly thought about exposing watermarks to all UDFs, but
> here I
> > >> really have an issue to see specific use cases. Could you maybe take a
> > few
> > >> minutes to think about it as well? I could only see someone misusing
> > Async
> > >> IO as a sink where a real sink would be more appropriate. In general,
> if
> > >> there is not a clear use case, we shouldn't add the functionality as
> > it's
> > >> just increased maintenance for no value.
> > >>
> > >> If we stick to the plan, I think your PR is already in a good shape.
> We
> > >> need to create a FLIP for it though, since it changes Public
> interfaces
> > >> [1]. I was initially not convinced that we should also change the old
> > >> SinkFunction interface, but seeing how little the change is, I
> wouldn't
> > >> mind at all to increase consistency. Only when we wrote the FLIP and
> > >> approved it (which should be minimal and fast), we should actually
> look
> > at
> > >> the PR ;).
> > >>
> > >> The only thing which I would improve is the name of the function.
> > >> processWatermark sounds as if the sink implementer really needs to
> > >> implement it (as you would need to do it on a custom operator). I
> would
> > >> make them symmetric to the record writing/invoking method (e.g.
> > >> writeWatermark and invokeWatermark).
> > >>
> > >> As a follow-up PR, we should then migrate KafkaShuffle to the new API.
> > >> But that's something I can do.
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> > >> [cwiki[.]apache[.]org]
> > >>
> > >> On Wed, May 19, 2021 at 3:34 AM Eron Wright <ewri...@streamnative.io
> > >> .invalid>
> > >> wrote:
> > >>
> > >> > Update: opened an issue and a PR.
> > >> >
> > >> >
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > >> >
> K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> > >> > plbgRO4$ [issues[.]apache[.]org]
> > >> >
> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> > >> >
> __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> > >> > $ [github[.]com]
> > >> >
> > >> >
> > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <
> ewri...@streamnative.io
> > >
> > >> > wrote:
> > >> >
> > >> > > Thanks Arvid and David for sharing your ideas on this subject.
> I'm
> > >> > > glad to hear that you're seeing use cases for watermark
> propagation
> > >> > > via an enhanced sink interface.
> > >> > >
> > >> > > As you've guessed, my interest is in Pulsar and am exploring some
> > >> > > options for brokering watermarks across stream processing
> pipelines.
> > >> > > I think
> > >> > Arvid
> > >> > > is speaking to a high-fidelity solution where the difference
> between
> > >> > intra-
> > >> > > and inter-pipeline flow is eliminated.  My goal is more limited; I
> > >> > > want
> > >> > to
> > >> > > write the watermark that arrives at the sink to Pulsar.  Simply
> > >> > > imagine that Pulsar has native support for watermarking in its
> > >> > > producer/consumer API, and we'll leave the details to another
> forum.
> > >> > >
> > >> > > David, I like your invariant.  I see lateness as stemming from the
> > >> > problem
> > >> > > domain and from system dynamics (e.g. scheduling, batching, lag).
> > >> > > When
> > >> > one
> > >> > > depends on order-of-observation to generate watermarks, the app
> may
> > >> > become
> > >> > > unduly sensitive to dynamics which bear on order-of-observation.
> My
> > >> > > goal is to factor out the system dynamics from lateness
> > determination.
> > >> > >
> > >> > > Arvid, to be most valuable (at least for my purposes) the
> > >> > > enhancement is needed on SinkFunction.  This will allow us to
> easily
> > >> > > evolve the existing Pulsar connector.
> > >> > >
> > >> > > Next step, I will open a PR to advance the conversation.
> > >> > >
> > >> > > Eron
> > >> > >
> > >> > > On Tue, May 18, 2021 at 5:06 AM David Morávek
> > >> > > <david.mora...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > >> Hi Eron,
> > >> > >>
> > >> > >> Thanks for starting this discussion. I've been thinking about
> this
> > >> > >> recently as we've run into "watermark related" issues, when
> > >> > >> chaining multiple pipelines together. My to cents to the
> > >> > >> discussion:
> > >> > >>
> > >> > >> How I like to think about the problem, is that there should an
> > >> > >> invariant that holds for any stream processing pipeline:
> "NON_LATE
> > >> > >> element
> > >> > entering
> > >> > >> the system, should never become LATE"
> > >> > >>
> > >> > >> Unfortunately this is exactly what happens in downstream
> pipelines,
> > >> > >> because the upstream one can:
> > >> > >> - break ordering (especially with higher parallelism)
> > >> > >> - emit elements that are ahead of output watermark
> > >> > >>
> > >> > >> There is not enough information to re-construct upstream
> watermark
> > >> > >> in latter stages (it's always just an estimate based on previous
> > >> > >> pipeline's output).
> > >> > >>
> > >> > >> It would be great, if we could have a general abstraction, that
> is
> > >> > >> reusable for various sources / sinks (not just Kafka / Pulsar,
> > >> > >> thought this would probably cover most of the use-cases) and
> > >> > >> systems.
> > >> > >>
> > >> > >> Is there any other use-case then sharing watermark between
> > >> > >> pipelines,
> > >> > that
> > >> > >> you're trying to solve?
> > >> > >>
> > >> > >> Arvid:
> > >> > >>
> > >> > >> 1. Watermarks are closely coupled to the used system (=Flink). I
> > >> > >> have a
> > >> > >> > hard time imagining that it's useful to use a different stream
> > >> > processor
> > >> > >> > downstream. So for now, I'm assuming that both upstream and
> > >> > >> > downstream
> > >> > >> are
> > >> > >> > Flink applications. In that case, we probably define both parts
> > >> > >> > of the pipeline in the same Flink job similar to KafkaStream's
> > >> #through.
> > >> > >> >
> > >> > >>
> > >> > >> I'd slightly disagree here. For example we're "materializing"
> > >> > change-logs
> > >> > >> produced by Flink pipeline into serving layer (random access db /
> > >> > >> in memory view / ..) and we need to know, whether responses we
> > >> > >> serve meet the "freshness" requirements (eg. you may want to
> > >> > >> respond differently, when watermark is lagging way too much
> behind
> > >> > >> processing time). Also not
> > >> > every
> > >> > >> stream processor in the pipeline needs to be Flink. It can as
> well
> > >> > >> be a simple element-wise transformation that reads from Kafka and
> > >> > >> writes back into separate topic (that's what we do for example
> with
> > >> > >> ML models, that have special hardware requirements).
> > >> > >>
> > >> > >> Best,
> > >> > >> D.
> > >> > >>
> > >> > >>
> > >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org>
> > >> wrote:
> > >> > >>
> > >> > >> > Hi Eron,
> > >> > >> >
> > >> > >> > I think this is a useful addition for storage systems that act
> as
> > >> > >> > pass-through for Flink to reduce recovery time. It is only
> useful
> > >> > >> > if
> > >> > you
> > >> > >> > combine it with regional fail-over as only a small part of the
> > >> > pipeline
> > >> > >> is
> > >> > >> > restarted.
> > >> > >> >
> > >> > >> > A couple of thoughts on the implications:
> > >> > >> > 1. Watermarks are closely coupled to the used system (=Flink).
> I
> > >> > >> > have
> > >> > a
> > >> > >> > hard time imagining that it's useful to use a different stream
> > >> > processor
> > >> > >> > downstream. So for now, I'm assuming that both upstream and
> > >> > >> > downstream
> > >> > >> are
> > >> > >> > Flink applications. In that case, we probably define both parts
> > >> > >> > of the pipeline in the same Flink job similar to KafkaStream's
> > >> #through.
> > >> > >> > 2. The schema of the respective intermediate stream/topic would
> > >> > >> > need
> > >> > to
> > >> > >> be
> > >> > >> > managed by Flink to encode both records and watermarks. This
> > >> > >> > reduces
> > >> > the
> > >> > >> > usability quite a bit and needs to be carefully crafted.
> > >> > >> > 3. It's not clear to me if constructs like SchemaRegistry can
> be
> > >> > >> properly
> > >> > >> > supported (and also if they should be supported) in terms of
> > >> > >> > schema evolution.
> > >> > >> > 4. Potentially, StreamStatus and LatencyMarker would also need
> to
> > >> > >> > be encoded.
> > >> > >> > 5. It's important to have some way to transport backpressure
> from
> > >> > >> > the downstream to the upstream. Or else you would have the same
> > >> > >> > issue as KafkaStreams where two separate pipelines can drift so
> > >> > >> > far away that
> > >> > you
> > >> > >> > experience data loss if the data retention period is smaller
> than
> > >> > >> > the drift.
> > >> > >> > 6. It's clear that you trade a huge chunk of throughput for
> lower
> > >> > >> overall
> > >> > >> > latency in case of failure. So it's an interesting feature for
> > >> > >> > use
> > >> > cases
> > >> > >> > with SLAs.
> > >> > >> >
> > >> > >> > Since we are phasing out SinkFunction, I'd prefer to only
> support
> > >> > >> > SinkWriter. Having a no-op default sounds good to me.
> > >> > >> >
> > >> > >> > We have some experimental feature for Kafka [1], which pretty
> > >> > >> > much
> > >> > >> reflects
> > >> > >> > your idea. Here we have an ugly workaround to be able to
> process
> > >> > >> > the watermark by using a custom StreamSink task. We could also
> > >> > >> > try to
> > >> > >> create a
> > >> > >> > FLIP that abstracts the actual system away and then we could
> use
> > >> > >> > the approach for both Pulsar and Kafka.
> > >> > >> >
> > >> > >> > [1]
> > >> > >> >
> > >> > >> >
> > >> > >>
> > >> >
> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
> > >> >
> r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
> > >> >
> k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
> > >> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
> > >> > [github[.]com]
> > >> > >> >
> > >> > >> >
> > >> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> > >> > >> > <ewri...@streamnative.io.invalid> wrote:
> > >> > >> >
> > >> > >> > > I would like to propose an enhancement to the Sink API, the
> > >> > >> > > ability
> > >> > to
> > >> > >> > > receive upstream watermarks.   I'm aware that the sink
> context
> > >> > >> provides
> > >> > >> > the
> > >> > >> > > current watermark for a given record.  I'd like to be able to
> > >> > >> > > write
> > >> > a
> > >> > >> > sink
> > >> > >> > > function that is invoked whenever the watermark changes.  Out
> > >> > >> > > of
> > >> > scope
> > >> > >> > > would be event-time timers (since sinks aren't keyed).
> > >> > >> > >
> > >> > >> > > For context, imagine that a stream storage system had the
> > >> > >> > > ability to persist watermarks in addition to ordinary
> elements,
> > >> > >> > > e.g. to serve
> > >> > as
> > >> > >> > > source watermarks in a downstream processor.  Ideally one
> could
> > >> > >> compose a
> > >> > >> > > multi-stage, event-driven application, with watermarks
> flowing
> > >> > >> end-to-end
> > >> > >> > > without need for a heuristics-based watermark at each stage.
> > >> > >> > >
> > >> > >> > > The specific proposal would be a new method on `SinkFunction`
> > >> > >> > > and/or
> > >> > >> on
> > >> > >> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark',
> > >> > >> > > with a
> > >> > >> > default
> > >> > >> > > implementation that does nothing.
> > >> > >> > >
> > >> > >> > > Thoughts?
> > >> > >> > >
> > >> > >> > > Thanks!
> > >> > >> > > Eron Wright
> > >> > >> > > StreamNative
> > >> > >> > >
> > >> > >> >
> > >> > >>
> > >> > >
> > >> > >
> > >> > > --
> > >> > >
> > >> > > Eron Wright   Cloud Engineering Lead
> > >> > >
> > >> > > p: +1 425 922 8617 <18163542939>
> > >> > >
> > >> > > streamnative.io |  Meet with me
> > >> > > <
> > https://urldefense.com/v3/__https://calendly.com/eronwright/regular
> > >> > >
> -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > >> > > dMtQrD25c$ [calendly[.]com]>
> > >> > >
> > >> > > <
> > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
> > >> > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > >> > > [github[.]com]>
> > >> > > <
> > https://urldefense.com/v3/__https://www.linkedin.com/company/stream
> > >> > >
> native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > >> > > dMqO4UZJa$ [linkedin[.]com]>
> > >> > > <
> https://urldefense.com/v3/__https://twitter.com/streamnativeio/__
> > ;!
> > >> > >
> !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > >> > > [twitter[.]com]>
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> >
> > >> > Eron Wright   Cloud Engineering Lead
> > >> >
> > >> > p: +1 425 922 8617 <18163542939>
> > >> >
> > >> > streamnative.io |  Meet with me
> > >> > <
> > https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
> > >> >
> -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
> > >> > rD25c$ [calendly[.]com]>
> > >> >
> > >> > <
> https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI
> > !
> > >> > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > >> > [github[.]com]>
> > >> > <
> > https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
> > >> >
> tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
> > >> > 4UZJa$ [linkedin[.]com]>
> > >> > <
> > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
> > >> > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > >> > [twitter[.]com]>
> > >> >
> > >>
> > >
> > >
> > > --
> > >
> > > Eron Wright   Cloud Engineering Lead
> > >
> > > p: +1 425 922 8617 <18163542939>
> > >
> > > streamnative.io |  Meet with me
> > > <https://calendly.com/eronwright/regular-1-hour>
> > >
> > > <https://github.com/streamnative>
> > > <https://www.linkedin.com/company/streamnative/>
> > > <https://twitter.com/streamnativeio/>
> > >
> >
> >
> > --
> >
> > Eron Wright   Cloud Engineering Lead
> >
> > p: +1 425 922 8617 <18163542939>
> >
> > streamnative.io |  Meet with me
> > <https://calendly.com/eronwright/regular-1-hour>
> >
> > <https://github.com/streamnative>
> > <https://www.linkedin.com/company/streamnative/>
> > <https://twitter.com/streamnativeio/>
> >
>


-- 

Eron Wright   Cloud Engineering Lead

p: +1 425 922 8617 <18163542939>

streamnative.io |  Meet with me
<https://calendly.com/eronwright/regular-1-hour>

<https://github.com/streamnative>
<https://www.linkedin.com/company/streamnative/>
<https://twitter.com/streamnativeio/>

Reply via email to