Hi Eron, Thanks for pushing that topic. I can now see that the benefit is even bigger than I initially thought. So it's worthwhile anyways to include that.
I also briefly thought about exposing watermarks to all UDFs, but here I really have an issue to see specific use cases. Could you maybe take a few minutes to think about it as well? I could only see someone misusing Async IO as a sink where a real sink would be more appropriate. In general, if there is not a clear use case, we shouldn't add the functionality as it's just increased maintenance for no value. If we stick to the plan, I think your PR is already in a good shape. We need to create a FLIP for it though, since it changes Public interfaces [1]. I was initially not convinced that we should also change the old SinkFunction interface, but seeing how little the change is, I wouldn't mind at all to increase consistency. Only when we wrote the FLIP and approved it (which should be minimal and fast), we should actually look at the PR ;). The only thing which I would improve is the name of the function. processWatermark sounds as if the sink implementer really needs to implement it (as you would need to do it on a custom operator). I would make them symmetric to the record writing/invoking method (e.g. writeWatermark and invokeWatermark). As a follow-up PR, we should then migrate KafkaShuffle to the new API. But that's something I can do. [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals On Wed, May 19, 2021 at 3:34 AM Eron Wright <ewri...@streamnative.io.invalid> wrote: > Update: opened an issue and a PR. > > https://issues.apache.org/jira/browse/FLINK-22700 > https://github.com/apache/flink/pull/15950 > > > On Tue, May 18, 2021 at 10:03 AM Eron Wright <ewri...@streamnative.io> > wrote: > > > Thanks Arvid and David for sharing your ideas on this subject. I'm glad > > to hear that you're seeing use cases for watermark propagation via an > > enhanced sink interface. > > > > As you've guessed, my interest is in Pulsar and am exploring some options > > for brokering watermarks across stream processing pipelines. I think > Arvid > > is speaking to a high-fidelity solution where the difference between > intra- > > and inter-pipeline flow is eliminated. My goal is more limited; I want > to > > write the watermark that arrives at the sink to Pulsar. Simply imagine > > that Pulsar has native support for watermarking in its producer/consumer > > API, and we'll leave the details to another forum. > > > > David, I like your invariant. I see lateness as stemming from the > problem > > domain and from system dynamics (e.g. scheduling, batching, lag). When > one > > depends on order-of-observation to generate watermarks, the app may > become > > unduly sensitive to dynamics which bear on order-of-observation. My goal > > is to factor out the system dynamics from lateness determination. > > > > Arvid, to be most valuable (at least for my purposes) the enhancement is > > needed on SinkFunction. This will allow us to easily evolve the existing > > Pulsar connector. > > > > Next step, I will open a PR to advance the conversation. > > > > Eron > > > > On Tue, May 18, 2021 at 5:06 AM David Morávek <david.mora...@gmail.com> > > wrote: > > > >> Hi Eron, > >> > >> Thanks for starting this discussion. I've been thinking about this > >> recently > >> as we've run into "watermark related" issues, when chaining multiple > >> pipelines together. My to cents to the discussion: > >> > >> How I like to think about the problem, is that there should an invariant > >> that holds for any stream processing pipeline: "NON_LATE element > entering > >> the system, should never become LATE" > >> > >> Unfortunately this is exactly what happens in downstream pipelines, > >> because > >> the upstream one can: > >> - break ordering (especially with higher parallelism) > >> - emit elements that are ahead of output watermark > >> > >> There is not enough information to re-construct upstream watermark in > >> latter stages (it's always just an estimate based on previous pipeline's > >> output). > >> > >> It would be great, if we could have a general abstraction, that is > >> reusable > >> for various sources / sinks (not just Kafka / Pulsar, thought this would > >> probably cover most of the use-cases) and systems. > >> > >> Is there any other use-case then sharing watermark between pipelines, > that > >> you're trying to solve? > >> > >> Arvid: > >> > >> 1. Watermarks are closely coupled to the used system (=Flink). I have a > >> > hard time imagining that it's useful to use a different stream > processor > >> > downstream. So for now, I'm assuming that both upstream and downstream > >> are > >> > Flink applications. In that case, we probably define both parts of the > >> > pipeline in the same Flink job similar to KafkaStream's #through. > >> > > >> > >> I'd slightly disagree here. For example we're "materializing" > change-logs > >> produced by Flink pipeline into serving layer (random access db / in > >> memory > >> view / ..) and we need to know, whether responses we serve meet the > >> "freshness" requirements (eg. you may want to respond differently, when > >> watermark is lagging way too much behind processing time). Also not > every > >> stream processor in the pipeline needs to be Flink. It can as well be a > >> simple element-wise transformation that reads from Kafka and writes back > >> into separate topic (that's what we do for example with ML models, that > >> have special hardware requirements). > >> > >> Best, > >> D. > >> > >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org> wrote: > >> > >> > Hi Eron, > >> > > >> > I think this is a useful addition for storage systems that act as > >> > pass-through for Flink to reduce recovery time. It is only useful if > you > >> > combine it with regional fail-over as only a small part of the > pipeline > >> is > >> > restarted. > >> > > >> > A couple of thoughts on the implications: > >> > 1. Watermarks are closely coupled to the used system (=Flink). I have > a > >> > hard time imagining that it's useful to use a different stream > processor > >> > downstream. So for now, I'm assuming that both upstream and downstream > >> are > >> > Flink applications. In that case, we probably define both parts of the > >> > pipeline in the same Flink job similar to KafkaStream's #through. > >> > 2. The schema of the respective intermediate stream/topic would need > to > >> be > >> > managed by Flink to encode both records and watermarks. This reduces > the > >> > usability quite a bit and needs to be carefully crafted. > >> > 3. It's not clear to me if constructs like SchemaRegistry can be > >> properly > >> > supported (and also if they should be supported) in terms of schema > >> > evolution. > >> > 4. Potentially, StreamStatus and LatencyMarker would also need to be > >> > encoded. > >> > 5. It's important to have some way to transport backpressure from the > >> > downstream to the upstream. Or else you would have the same issue as > >> > KafkaStreams where two separate pipelines can drift so far away that > you > >> > experience data loss if the data retention period is smaller than the > >> > drift. > >> > 6. It's clear that you trade a huge chunk of throughput for lower > >> overall > >> > latency in case of failure. So it's an interesting feature for use > cases > >> > with SLAs. > >> > > >> > Since we are phasing out SinkFunction, I'd prefer to only support > >> > SinkWriter. Having a no-op default sounds good to me. > >> > > >> > We have some experimental feature for Kafka [1], which pretty much > >> reflects > >> > your idea. Here we have an ugly workaround to be able to process the > >> > watermark by using a custom StreamSink task. We could also try to > >> create a > >> > FLIP that abstracts the actual system away and then we could use the > >> > approach for both Pulsar and Kafka. > >> > > >> > [1] > >> > > >> > > >> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java#L103 > >> > > >> > > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright > >> > <ewri...@streamnative.io.invalid> wrote: > >> > > >> > > I would like to propose an enhancement to the Sink API, the ability > to > >> > > receive upstream watermarks. I'm aware that the sink context > >> provides > >> > the > >> > > current watermark for a given record. I'd like to be able to write > a > >> > sink > >> > > function that is invoked whenever the watermark changes. Out of > scope > >> > > would be event-time timers (since sinks aren't keyed). > >> > > > >> > > For context, imagine that a stream storage system had the ability to > >> > > persist watermarks in addition to ordinary elements, e.g. to serve > as > >> > > source watermarks in a downstream processor. Ideally one could > >> compose a > >> > > multi-stage, event-driven application, with watermarks flowing > >> end-to-end > >> > > without need for a heuristics-based watermark at each stage. > >> > > > >> > > The specific proposal would be a new method on `SinkFunction` and/or > >> on > >> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', with a > >> > default > >> > > implementation that does nothing. > >> > > > >> > > Thoughts? > >> > > > >> > > Thanks! > >> > > Eron Wright > >> > > StreamNative > >> > > > >> > > >> > > > > > > -- > > > > Eron Wright Cloud Engineering Lead > > > > p: +1 425 922 8617 <18163542939> > > > > streamnative.io | Meet with me > > <https://calendly.com/eronwright/regular-1-hour> > > > > <https://github.com/streamnative> > > <https://www.linkedin.com/company/streamnative/> > > <https://twitter.com/streamnativeio/> > > > > > -- > > Eron Wright Cloud Engineering Lead > > p: +1 425 922 8617 <18163542939> > > streamnative.io | Meet with me > <https://calendly.com/eronwright/regular-1-hour> > > <https://github.com/streamnative> > <https://www.linkedin.com/company/streamnative/> > <https://twitter.com/streamnativeio/> >