Hi Eron,

Thanks for pushing that topic. I can now see that the benefit is even
bigger than I initially thought. So it's worthwhile anyways to include that.

I also briefly thought about exposing watermarks to all UDFs, but here I
really have an issue to see specific use cases. Could you maybe take a few
minutes to think about it as well? I could only see someone misusing Async
IO as a sink where a real sink would be more appropriate. In general, if
there is not a clear use case, we shouldn't add the functionality as it's
just increased maintenance for no value.

If we stick to the plan, I think your PR is already in a good shape. We
need to create a FLIP for it though, since it changes Public interfaces
[1]. I was initially not convinced that we should also change the old
SinkFunction interface, but seeing how little the change is, I wouldn't
mind at all to increase consistency. Only when we wrote the FLIP and
approved it (which should be minimal and fast), we should actually look at
the PR ;).

The only thing which I would improve is the name of the function.
processWatermark sounds as if the sink implementer really needs to
implement it (as you would need to do it on a custom operator). I would
make them symmetric to the record writing/invoking method (e.g.
writeWatermark and invokeWatermark).

As a follow-up PR, we should then migrate KafkaShuffle to the new API. But
that's something I can do.

[1]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

On Wed, May 19, 2021 at 3:34 AM Eron Wright <ewri...@streamnative.io.invalid>
wrote:

> Update: opened an issue and a PR.
>
> https://issues.apache.org/jira/browse/FLINK-22700
> https://github.com/apache/flink/pull/15950
>
>
> On Tue, May 18, 2021 at 10:03 AM Eron Wright <ewri...@streamnative.io>
> wrote:
>
> > Thanks Arvid and David for sharing your ideas on this subject.  I'm glad
> > to hear that you're seeing use cases for watermark propagation via an
> > enhanced sink interface.
> >
> > As you've guessed, my interest is in Pulsar and am exploring some options
> > for brokering watermarks across stream processing pipelines.  I think
> Arvid
> > is speaking to a high-fidelity solution where the difference between
> intra-
> > and inter-pipeline flow is eliminated.  My goal is more limited; I want
> to
> > write the watermark that arrives at the sink to Pulsar.  Simply imagine
> > that Pulsar has native support for watermarking in its producer/consumer
> > API, and we'll leave the details to another forum.
> >
> > David, I like your invariant.  I see lateness as stemming from the
> problem
> > domain and from system dynamics (e.g. scheduling, batching, lag).  When
> one
> > depends on order-of-observation to generate watermarks, the app may
> become
> > unduly sensitive to dynamics which bear on order-of-observation.  My goal
> > is to factor out the system dynamics from lateness determination.
> >
> > Arvid, to be most valuable (at least for my purposes) the enhancement is
> > needed on SinkFunction.  This will allow us to easily evolve the existing
> > Pulsar connector.
> >
> > Next step, I will open a PR to advance the conversation.
> >
> > Eron
> >
> > On Tue, May 18, 2021 at 5:06 AM David Morávek <david.mora...@gmail.com>
> > wrote:
> >
> >> Hi Eron,
> >>
> >> Thanks for starting this discussion. I've been thinking about this
> >> recently
> >> as we've run into "watermark related" issues, when chaining multiple
> >> pipelines together. My to cents to the discussion:
> >>
> >> How I like to think about the problem, is that there should an invariant
> >> that holds for any stream processing pipeline: "NON_LATE element
> entering
> >> the system, should never become LATE"
> >>
> >> Unfortunately this is exactly what happens in downstream pipelines,
> >> because
> >> the upstream one can:
> >> - break ordering (especially with higher parallelism)
> >> - emit elements that are ahead of output watermark
> >>
> >> There is not enough information to re-construct upstream watermark in
> >> latter stages (it's always just an estimate based on previous pipeline's
> >> output).
> >>
> >> It would be great, if we could have a general abstraction, that is
> >> reusable
> >> for various sources / sinks (not just Kafka / Pulsar, thought this would
> >> probably cover most of the use-cases) and systems.
> >>
> >> Is there any other use-case then sharing watermark between pipelines,
> that
> >> you're trying to solve?
> >>
> >> Arvid:
> >>
> >> 1. Watermarks are closely coupled to the used system (=Flink). I have a
> >> > hard time imagining that it's useful to use a different stream
> processor
> >> > downstream. So for now, I'm assuming that both upstream and downstream
> >> are
> >> > Flink applications. In that case, we probably define both parts of the
> >> > pipeline in the same Flink job similar to KafkaStream's #through.
> >> >
> >>
> >> I'd slightly disagree here. For example we're "materializing"
> change-logs
> >> produced by Flink pipeline into serving layer (random access db / in
> >> memory
> >> view / ..) and we need to know, whether responses we serve meet the
> >> "freshness" requirements (eg. you may want to respond differently, when
> >> watermark is lagging way too much behind processing time). Also not
> every
> >> stream processor in the pipeline needs to be Flink. It can as well be a
> >> simple element-wise transformation that reads from Kafka and writes back
> >> into separate topic (that's what we do for example with ML models, that
> >> have special hardware requirements).
> >>
> >> Best,
> >> D.
> >>
> >>
> >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org> wrote:
> >>
> >> > Hi Eron,
> >> >
> >> > I think this is a useful addition for storage systems that act as
> >> > pass-through for Flink to reduce recovery time. It is only useful if
> you
> >> > combine it with regional fail-over as only a small part of the
> pipeline
> >> is
> >> > restarted.
> >> >
> >> > A couple of thoughts on the implications:
> >> > 1. Watermarks are closely coupled to the used system (=Flink). I have
> a
> >> > hard time imagining that it's useful to use a different stream
> processor
> >> > downstream. So for now, I'm assuming that both upstream and downstream
> >> are
> >> > Flink applications. In that case, we probably define both parts of the
> >> > pipeline in the same Flink job similar to KafkaStream's #through.
> >> > 2. The schema of the respective intermediate stream/topic would need
> to
> >> be
> >> > managed by Flink to encode both records and watermarks. This reduces
> the
> >> > usability quite a bit and needs to be carefully crafted.
> >> > 3. It's not clear to me if constructs like SchemaRegistry can be
> >> properly
> >> > supported (and also if they should be supported) in terms of schema
> >> > evolution.
> >> > 4. Potentially, StreamStatus and LatencyMarker would also need to be
> >> > encoded.
> >> > 5. It's important to have some way to transport backpressure from the
> >> > downstream to the upstream. Or else you would have the same issue as
> >> > KafkaStreams where two separate pipelines can drift so far away that
> you
> >> > experience data loss if the data retention period is smaller than the
> >> > drift.
> >> > 6. It's clear that you trade a huge chunk of throughput for lower
> >> overall
> >> > latency in case of failure. So it's an interesting feature for use
> cases
> >> > with SLAs.
> >> >
> >> > Since we are phasing out SinkFunction, I'd prefer to only support
> >> > SinkWriter. Having a no-op default sounds good to me.
> >> >
> >> > We have some experimental feature for Kafka [1], which pretty much
> >> reflects
> >> > your idea. Here we have an ugly workaround to be able to process the
> >> > watermark by using a custom StreamSink task. We could also try to
> >> create a
> >> > FLIP that abstracts the actual system away and then we could use the
> >> > approach for both Pulsar and Kafka.
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java#L103
> >> >
> >> >
> >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> >> > <ewri...@streamnative.io.invalid> wrote:
> >> >
> >> > > I would like to propose an enhancement to the Sink API, the ability
> to
> >> > > receive upstream watermarks.   I'm aware that the sink context
> >> provides
> >> > the
> >> > > current watermark for a given record.  I'd like to be able to write
> a
> >> > sink
> >> > > function that is invoked whenever the watermark changes.  Out of
> scope
> >> > > would be event-time timers (since sinks aren't keyed).
> >> > >
> >> > > For context, imagine that a stream storage system had the ability to
> >> > > persist watermarks in addition to ordinary elements, e.g. to serve
> as
> >> > > source watermarks in a downstream processor.  Ideally one could
> >> compose a
> >> > > multi-stage, event-driven application, with watermarks flowing
> >> end-to-end
> >> > > without need for a heuristics-based watermark at each stage.
> >> > >
> >> > > The specific proposal would be a new method on `SinkFunction` and/or
> >> on
> >> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', with a
> >> > default
> >> > > implementation that does nothing.
> >> > >
> >> > > Thoughts?
> >> > >
> >> > > Thanks!
> >> > > Eron Wright
> >> > > StreamNative
> >> > >
> >> >
> >>
> >
> >
> > --
> >
> > Eron Wright   Cloud Engineering Lead
> >
> > p: +1 425 922 8617 <18163542939>
> >
> > streamnative.io |  Meet with me
> > <https://calendly.com/eronwright/regular-1-hour>
> >
> > <https://github.com/streamnative>
> > <https://www.linkedin.com/company/streamnative/>
> > <https://twitter.com/streamnativeio/>
> >
>
>
> --
>
> Eron Wright   Cloud Engineering Lead
>
> p: +1 425 922 8617 <18163542939>
>
> streamnative.io |  Meet with me
> <https://calendly.com/eronwright/regular-1-hour>
>
> <https://github.com/streamnative>
> <https://www.linkedin.com/company/streamnative/>
> <https://twitter.com/streamnativeio/>
>

Reply via email to