Update: opened an issue and a PR.

https://issues.apache.org/jira/browse/FLINK-22700
https://github.com/apache/flink/pull/15950


On Tue, May 18, 2021 at 10:03 AM Eron Wright <ewri...@streamnative.io>
wrote:

> Thanks Arvid and David for sharing your ideas on this subject.  I'm glad
> to hear that you're seeing use cases for watermark propagation via an
> enhanced sink interface.
>
> As you've guessed, my interest is in Pulsar and am exploring some options
> for brokering watermarks across stream processing pipelines.  I think Arvid
> is speaking to a high-fidelity solution where the difference between intra-
> and inter-pipeline flow is eliminated.  My goal is more limited; I want to
> write the watermark that arrives at the sink to Pulsar.  Simply imagine
> that Pulsar has native support for watermarking in its producer/consumer
> API, and we'll leave the details to another forum.
>
> David, I like your invariant.  I see lateness as stemming from the problem
> domain and from system dynamics (e.g. scheduling, batching, lag).  When one
> depends on order-of-observation to generate watermarks, the app may become
> unduly sensitive to dynamics which bear on order-of-observation.  My goal
> is to factor out the system dynamics from lateness determination.
>
> Arvid, to be most valuable (at least for my purposes) the enhancement is
> needed on SinkFunction.  This will allow us to easily evolve the existing
> Pulsar connector.
>
> Next step, I will open a PR to advance the conversation.
>
> Eron
>
> On Tue, May 18, 2021 at 5:06 AM David Morávek <david.mora...@gmail.com>
> wrote:
>
>> Hi Eron,
>>
>> Thanks for starting this discussion. I've been thinking about this
>> recently
>> as we've run into "watermark related" issues, when chaining multiple
>> pipelines together. My to cents to the discussion:
>>
>> How I like to think about the problem, is that there should an invariant
>> that holds for any stream processing pipeline: "NON_LATE element entering
>> the system, should never become LATE"
>>
>> Unfortunately this is exactly what happens in downstream pipelines,
>> because
>> the upstream one can:
>> - break ordering (especially with higher parallelism)
>> - emit elements that are ahead of output watermark
>>
>> There is not enough information to re-construct upstream watermark in
>> latter stages (it's always just an estimate based on previous pipeline's
>> output).
>>
>> It would be great, if we could have a general abstraction, that is
>> reusable
>> for various sources / sinks (not just Kafka / Pulsar, thought this would
>> probably cover most of the use-cases) and systems.
>>
>> Is there any other use-case then sharing watermark between pipelines, that
>> you're trying to solve?
>>
>> Arvid:
>>
>> 1. Watermarks are closely coupled to the used system (=Flink). I have a
>> > hard time imagining that it's useful to use a different stream processor
>> > downstream. So for now, I'm assuming that both upstream and downstream
>> are
>> > Flink applications. In that case, we probably define both parts of the
>> > pipeline in the same Flink job similar to KafkaStream's #through.
>> >
>>
>> I'd slightly disagree here. For example we're "materializing" change-logs
>> produced by Flink pipeline into serving layer (random access db / in
>> memory
>> view / ..) and we need to know, whether responses we serve meet the
>> "freshness" requirements (eg. you may want to respond differently, when
>> watermark is lagging way too much behind processing time). Also not every
>> stream processor in the pipeline needs to be Flink. It can as well be a
>> simple element-wise transformation that reads from Kafka and writes back
>> into separate topic (that's what we do for example with ML models, that
>> have special hardware requirements).
>>
>> Best,
>> D.
>>
>>
>> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org> wrote:
>>
>> > Hi Eron,
>> >
>> > I think this is a useful addition for storage systems that act as
>> > pass-through for Flink to reduce recovery time. It is only useful if you
>> > combine it with regional fail-over as only a small part of the pipeline
>> is
>> > restarted.
>> >
>> > A couple of thoughts on the implications:
>> > 1. Watermarks are closely coupled to the used system (=Flink). I have a
>> > hard time imagining that it's useful to use a different stream processor
>> > downstream. So for now, I'm assuming that both upstream and downstream
>> are
>> > Flink applications. In that case, we probably define both parts of the
>> > pipeline in the same Flink job similar to KafkaStream's #through.
>> > 2. The schema of the respective intermediate stream/topic would need to
>> be
>> > managed by Flink to encode both records and watermarks. This reduces the
>> > usability quite a bit and needs to be carefully crafted.
>> > 3. It's not clear to me if constructs like SchemaRegistry can be
>> properly
>> > supported (and also if they should be supported) in terms of schema
>> > evolution.
>> > 4. Potentially, StreamStatus and LatencyMarker would also need to be
>> > encoded.
>> > 5. It's important to have some way to transport backpressure from the
>> > downstream to the upstream. Or else you would have the same issue as
>> > KafkaStreams where two separate pipelines can drift so far away that you
>> > experience data loss if the data retention period is smaller than the
>> > drift.
>> > 6. It's clear that you trade a huge chunk of throughput for lower
>> overall
>> > latency in case of failure. So it's an interesting feature for use cases
>> > with SLAs.
>> >
>> > Since we are phasing out SinkFunction, I'd prefer to only support
>> > SinkWriter. Having a no-op default sounds good to me.
>> >
>> > We have some experimental feature for Kafka [1], which pretty much
>> reflects
>> > your idea. Here we have an ugly workaround to be able to process the
>> > watermark by using a custom StreamSink task. We could also try to
>> create a
>> > FLIP that abstracts the actual system away and then we could use the
>> > approach for both Pulsar and Kafka.
>> >
>> > [1]
>> >
>> >
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java#L103
>> >
>> >
>> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
>> > <ewri...@streamnative.io.invalid> wrote:
>> >
>> > > I would like to propose an enhancement to the Sink API, the ability to
>> > > receive upstream watermarks.   I'm aware that the sink context
>> provides
>> > the
>> > > current watermark for a given record.  I'd like to be able to write a
>> > sink
>> > > function that is invoked whenever the watermark changes.  Out of scope
>> > > would be event-time timers (since sinks aren't keyed).
>> > >
>> > > For context, imagine that a stream storage system had the ability to
>> > > persist watermarks in addition to ordinary elements, e.g. to serve as
>> > > source watermarks in a downstream processor.  Ideally one could
>> compose a
>> > > multi-stage, event-driven application, with watermarks flowing
>> end-to-end
>> > > without need for a heuristics-based watermark at each stage.
>> > >
>> > > The specific proposal would be a new method on `SinkFunction` and/or
>> on
>> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', with a
>> > default
>> > > implementation that does nothing.
>> > >
>> > > Thoughts?
>> > >
>> > > Thanks!
>> > > Eron Wright
>> > > StreamNative
>> > >
>> >
>>
>
>
> --
>
> Eron Wright   Cloud Engineering Lead
>
> p: +1 425 922 8617 <18163542939>
>
> streamnative.io |  Meet with me
> <https://calendly.com/eronwright/regular-1-hour>
>
> <https://github.com/streamnative>
> <https://www.linkedin.com/company/streamnative/>
> <https://twitter.com/streamnativeio/>
>


-- 

Eron Wright   Cloud Engineering Lead

p: +1 425 922 8617 <18163542939>

streamnative.io |  Meet with me
<https://calendly.com/eronwright/regular-1-hour>

<https://github.com/streamnative>
<https://www.linkedin.com/company/streamnative/>
<https://twitter.com/streamnativeio/>

Reply via email to