Arvid, 1. I assume that the method name `invoke` stems from considering the SinkFunction to be a functional interface, but is otherwise meaningless. Keeping it as `writeWatermark` does keep it symmetric with SinkWriter. My vote is to leave it. You decide.
2+3. I too considered adding a `WatermarkContext`, but it would merely be a placeholder. I don't anticipate any context info in future. As we see with invoke, it is possible to add a context later in a backwards-compatible way. My vote is to not introduce a context. You decide. 4. No anticipated compatibility issues. 5. Short answer, it works as expected. The new methods are invoked whenever the underlying operator receives a watermark. I do believe that batch and ingestion time applications receive watermarks. Seems the programming model is more unified in that respect since 1.12 (FLIP-134). 6. The failure behavior is the same as for elements. Thanks, Eron On Tue, May 25, 2021 at 12:42 PM Arvid Heise <ar...@apache.org> wrote: > Hi Eron, > > I think the FLIP is crisp and mostly good to go. Some smaller > things/questions: > > 1. SinkFunction#writeWatermark could be named > SinkFunction#invokeWatermark or invokeOnWatermark to keep it symmetric. > 2. We could add the context parameter to both. For SinkWriter#Context, > we currently do not gain much. SinkFunction#Context also exposes > processing > time, which may or may not be handy and is currently mostly used for > StreamingFileSink bucket policies. We may add that processing time flag > also to SinkWriter#Context in the future. > 3. Alternatively, we could also add a different context parameter just > to keep the API stable while allowing additional information to be > passed > in the future. > 4. Would we run into any compatibility issue if we use Flink 1.13 source > in Flink 1.14 (with this FLIP) or vice versa? > 5. What happens with sinks that use the new methods in applications that > do not have watermarks (batch mode, processing time)? Does this also > work > with ingestion time sufficiently? > 6. How do exactly once sinks deal with written watermarks in case of > failure? I guess it's the same as normal records. (Either rollback of > transaction or deduplication on resumption) > > Best, > > Arvid > > On Tue, May 25, 2021 at 6:44 PM Eron Wright <ewri...@streamnative.io > .invalid> > wrote: > > > Does anyone have further comment on FLIP-167? > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API > > > > Thanks, > > Eron > > > > > > On Thu, May 20, 2021 at 5:02 PM Eron Wright <ewri...@streamnative.io> > > wrote: > > > > > Filed FLIP-167: Watermarks for Sink API: > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API > > > > > > I'd like to call a vote next week, is that reasonable? > > > > > > > > > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <b.z...@dell.com> wrote: > > > > > >> Hi Arvid and Eron, > > >> > > >> Thanks for the discussion and I read through Eron's pull request and I > > >> think this can benefit Pravega Flink connector as well. > > >> > > >> Here is some background. Pravega had the watermark concept through the > > >> event stream since two years ago, and here is a blog introduction[1] > for > > >> Pravega watermark. > > >> Pravega Flink connector also had this watermark integration last year > > >> that we wanted to propagate the Flink watermark to Pravega in the > > >> SinkFunction, and at that time we just used the existing Flink API > that > > we > > >> keep the last watermark in memory and check if watermark changes for > > each > > >> event[2] which is not efficient. With such new interface, we can also > > >> manage the watermark propagation much more easily. > > >> > > >> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/ > > >> [2] > > >> > > > https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465 > > >> > > >> -----Original Message----- > > >> From: Arvid Heise <ar...@apache.org> > > >> Sent: Wednesday, May 19, 2021 16:06 > > >> To: dev > > >> Subject: Re: [DISCUSS] Watermark propagation with Sink API > > >> > > >> > > >> [EXTERNAL EMAIL] > > >> > > >> Hi Eron, > > >> > > >> Thanks for pushing that topic. I can now see that the benefit is even > > >> bigger than I initially thought. So it's worthwhile anyways to include > > that. > > >> > > >> I also briefly thought about exposing watermarks to all UDFs, but > here I > > >> really have an issue to see specific use cases. Could you maybe take a > > few > > >> minutes to think about it as well? I could only see someone misusing > > Async > > >> IO as a sink where a real sink would be more appropriate. In general, > if > > >> there is not a clear use case, we shouldn't add the functionality as > > it's > > >> just increased maintenance for no value. > > >> > > >> If we stick to the plan, I think your PR is already in a good shape. > We > > >> need to create a FLIP for it though, since it changes Public > interfaces > > >> [1]. I was initially not convinced that we should also change the old > > >> SinkFunction interface, but seeing how little the change is, I > wouldn't > > >> mind at all to increase consistency. Only when we wrote the FLIP and > > >> approved it (which should be minimal and fast), we should actually > look > > at > > >> the PR ;). > > >> > > >> The only thing which I would improve is the name of the function. > > >> processWatermark sounds as if the sink implementer really needs to > > >> implement it (as you would need to do it on a custom operator). I > would > > >> make them symmetric to the record writing/invoking method (e.g. > > >> writeWatermark and invokeWatermark). > > >> > > >> As a follow-up PR, we should then migrate KafkaShuffle to the new API. > > >> But that's something I can do. > > >> > > >> [1] > > >> > > >> > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$ > > >> [cwiki[.]apache[.]org] > > >> > > >> On Wed, May 19, 2021 at 3:34 AM Eron Wright <ewri...@streamnative.io > > >> .invalid> > > >> wrote: > > >> > > >> > Update: opened an issue and a PR. > > >> > > > >> > > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN > > >> > > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM > > >> > plbgRO4$ [issues[.]apache[.]org] > > >> > > > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950 > > >> > > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a > > >> > $ [github[.]com] > > >> > > > >> > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright < > ewri...@streamnative.io > > > > > >> > wrote: > > >> > > > >> > > Thanks Arvid and David for sharing your ideas on this subject. > I'm > > >> > > glad to hear that you're seeing use cases for watermark > propagation > > >> > > via an enhanced sink interface. > > >> > > > > >> > > As you've guessed, my interest is in Pulsar and am exploring some > > >> > > options for brokering watermarks across stream processing > pipelines. > > >> > > I think > > >> > Arvid > > >> > > is speaking to a high-fidelity solution where the difference > between > > >> > intra- > > >> > > and inter-pipeline flow is eliminated. My goal is more limited; I > > >> > > want > > >> > to > > >> > > write the watermark that arrives at the sink to Pulsar. Simply > > >> > > imagine that Pulsar has native support for watermarking in its > > >> > > producer/consumer API, and we'll leave the details to another > forum. > > >> > > > > >> > > David, I like your invariant. I see lateness as stemming from the > > >> > problem > > >> > > domain and from system dynamics (e.g. scheduling, batching, lag). > > >> > > When > > >> > one > > >> > > depends on order-of-observation to generate watermarks, the app > may > > >> > become > > >> > > unduly sensitive to dynamics which bear on order-of-observation. > My > > >> > > goal is to factor out the system dynamics from lateness > > determination. > > >> > > > > >> > > Arvid, to be most valuable (at least for my purposes) the > > >> > > enhancement is needed on SinkFunction. This will allow us to > easily > > >> > > evolve the existing Pulsar connector. > > >> > > > > >> > > Next step, I will open a PR to advance the conversation. > > >> > > > > >> > > Eron > > >> > > > > >> > > On Tue, May 18, 2021 at 5:06 AM David Morávek > > >> > > <david.mora...@gmail.com> > > >> > > wrote: > > >> > > > > >> > >> Hi Eron, > > >> > >> > > >> > >> Thanks for starting this discussion. I've been thinking about > this > > >> > >> recently as we've run into "watermark related" issues, when > > >> > >> chaining multiple pipelines together. My to cents to the > > >> > >> discussion: > > >> > >> > > >> > >> How I like to think about the problem, is that there should an > > >> > >> invariant that holds for any stream processing pipeline: > "NON_LATE > > >> > >> element > > >> > entering > > >> > >> the system, should never become LATE" > > >> > >> > > >> > >> Unfortunately this is exactly what happens in downstream > pipelines, > > >> > >> because the upstream one can: > > >> > >> - break ordering (especially with higher parallelism) > > >> > >> - emit elements that are ahead of output watermark > > >> > >> > > >> > >> There is not enough information to re-construct upstream > watermark > > >> > >> in latter stages (it's always just an estimate based on previous > > >> > >> pipeline's output). > > >> > >> > > >> > >> It would be great, if we could have a general abstraction, that > is > > >> > >> reusable for various sources / sinks (not just Kafka / Pulsar, > > >> > >> thought this would probably cover most of the use-cases) and > > >> > >> systems. > > >> > >> > > >> > >> Is there any other use-case then sharing watermark between > > >> > >> pipelines, > > >> > that > > >> > >> you're trying to solve? > > >> > >> > > >> > >> Arvid: > > >> > >> > > >> > >> 1. Watermarks are closely coupled to the used system (=Flink). I > > >> > >> have a > > >> > >> > hard time imagining that it's useful to use a different stream > > >> > processor > > >> > >> > downstream. So for now, I'm assuming that both upstream and > > >> > >> > downstream > > >> > >> are > > >> > >> > Flink applications. In that case, we probably define both parts > > >> > >> > of the pipeline in the same Flink job similar to KafkaStream's > > >> #through. > > >> > >> > > > >> > >> > > >> > >> I'd slightly disagree here. For example we're "materializing" > > >> > change-logs > > >> > >> produced by Flink pipeline into serving layer (random access db / > > >> > >> in memory view / ..) and we need to know, whether responses we > > >> > >> serve meet the "freshness" requirements (eg. you may want to > > >> > >> respond differently, when watermark is lagging way too much > behind > > >> > >> processing time). Also not > > >> > every > > >> > >> stream processor in the pipeline needs to be Flink. It can as > well > > >> > >> be a simple element-wise transformation that reads from Kafka and > > >> > >> writes back into separate topic (that's what we do for example > with > > >> > >> ML models, that have special hardware requirements). > > >> > >> > > >> > >> Best, > > >> > >> D. > > >> > >> > > >> > >> > > >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org> > > >> wrote: > > >> > >> > > >> > >> > Hi Eron, > > >> > >> > > > >> > >> > I think this is a useful addition for storage systems that act > as > > >> > >> > pass-through for Flink to reduce recovery time. It is only > useful > > >> > >> > if > > >> > you > > >> > >> > combine it with regional fail-over as only a small part of the > > >> > pipeline > > >> > >> is > > >> > >> > restarted. > > >> > >> > > > >> > >> > A couple of thoughts on the implications: > > >> > >> > 1. Watermarks are closely coupled to the used system (=Flink). > I > > >> > >> > have > > >> > a > > >> > >> > hard time imagining that it's useful to use a different stream > > >> > processor > > >> > >> > downstream. So for now, I'm assuming that both upstream and > > >> > >> > downstream > > >> > >> are > > >> > >> > Flink applications. In that case, we probably define both parts > > >> > >> > of the pipeline in the same Flink job similar to KafkaStream's > > >> #through. > > >> > >> > 2. The schema of the respective intermediate stream/topic would > > >> > >> > need > > >> > to > > >> > >> be > > >> > >> > managed by Flink to encode both records and watermarks. This > > >> > >> > reduces > > >> > the > > >> > >> > usability quite a bit and needs to be carefully crafted. > > >> > >> > 3. It's not clear to me if constructs like SchemaRegistry can > be > > >> > >> properly > > >> > >> > supported (and also if they should be supported) in terms of > > >> > >> > schema evolution. > > >> > >> > 4. Potentially, StreamStatus and LatencyMarker would also need > to > > >> > >> > be encoded. > > >> > >> > 5. It's important to have some way to transport backpressure > from > > >> > >> > the downstream to the upstream. Or else you would have the same > > >> > >> > issue as KafkaStreams where two separate pipelines can drift so > > >> > >> > far away that > > >> > you > > >> > >> > experience data loss if the data retention period is smaller > than > > >> > >> > the drift. > > >> > >> > 6. It's clear that you trade a huge chunk of throughput for > lower > > >> > >> overall > > >> > >> > latency in case of failure. So it's an interesting feature for > > >> > >> > use > > >> > cases > > >> > >> > with SLAs. > > >> > >> > > > >> > >> > Since we are phasing out SinkFunction, I'd prefer to only > support > > >> > >> > SinkWriter. Having a no-op default sounds good to me. > > >> > >> > > > >> > >> > We have some experimental feature for Kafka [1], which pretty > > >> > >> > much > > >> > >> reflects > > >> > >> > your idea. Here we have an ugly workaround to be able to > process > > >> > >> > the watermark by using a custom StreamSink task. We could also > > >> > >> > try to > > >> > >> create a > > >> > >> > FLIP that abstracts the actual system away and then we could > use > > >> > >> > the approach for both Pulsar and Kafka. > > >> > >> > > > >> > >> > [1] > > >> > >> > > > >> > >> > > > >> > >> > > >> > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste > > >> > > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin > > >> > > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw! > > >> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$ > > >> > [github[.]com] > > >> > >> > > > >> > >> > > > >> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright > > >> > >> > <ewri...@streamnative.io.invalid> wrote: > > >> > >> > > > >> > >> > > I would like to propose an enhancement to the Sink API, the > > >> > >> > > ability > > >> > to > > >> > >> > > receive upstream watermarks. I'm aware that the sink > context > > >> > >> provides > > >> > >> > the > > >> > >> > > current watermark for a given record. I'd like to be able to > > >> > >> > > write > > >> > a > > >> > >> > sink > > >> > >> > > function that is invoked whenever the watermark changes. Out > > >> > >> > > of > > >> > scope > > >> > >> > > would be event-time timers (since sinks aren't keyed). > > >> > >> > > > > >> > >> > > For context, imagine that a stream storage system had the > > >> > >> > > ability to persist watermarks in addition to ordinary > elements, > > >> > >> > > e.g. to serve > > >> > as > > >> > >> > > source watermarks in a downstream processor. Ideally one > could > > >> > >> compose a > > >> > >> > > multi-stage, event-driven application, with watermarks > flowing > > >> > >> end-to-end > > >> > >> > > without need for a heuristics-based watermark at each stage. > > >> > >> > > > > >> > >> > > The specific proposal would be a new method on `SinkFunction` > > >> > >> > > and/or > > >> > >> on > > >> > >> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', > > >> > >> > > with a > > >> > >> > default > > >> > >> > > implementation that does nothing. > > >> > >> > > > > >> > >> > > Thoughts? > > >> > >> > > > > >> > >> > > Thanks! > > >> > >> > > Eron Wright > > >> > >> > > StreamNative > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > > >> > > > > >> > > -- > > >> > > > > >> > > Eron Wright Cloud Engineering Lead > > >> > > > > >> > > p: +1 425 922 8617 <18163542939> > > >> > > > > >> > > streamnative.io | Meet with me > > >> > > < > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular > > >> > > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 > > >> > > dMtQrD25c$ [calendly[.]com]> > > >> > > > > >> > > < > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK > > >> > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ > > >> > > [github[.]com]> > > >> > > < > > https://urldefense.com/v3/__https://www.linkedin.com/company/stream > > >> > > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 > > >> > > dMqO4UZJa$ [linkedin[.]com]> > > >> > > < > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__ > > ;! > > >> > > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ > > >> > > [twitter[.]com]> > > >> > > > > >> > > > >> > > > >> > -- > > >> > > > >> > Eron Wright Cloud Engineering Lead > > >> > > > >> > p: +1 425 922 8617 <18163542939> > > >> > > > >> > streamnative.io | Meet with me > > >> > < > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1 > > >> > > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ > > >> > rD25c$ [calendly[.]com]> > > >> > > > >> > < > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI > > ! > > >> > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ > > >> > [github[.]com]> > > >> > < > > https://urldefense.com/v3/__https://www.linkedin.com/company/streamna > > >> > > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO > > >> > 4UZJa$ [linkedin[.]com]> > > >> > < > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L > > >> > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ > > >> > [twitter[.]com]> > > >> > > > >> > > > > > > > > > -- > > > > > > Eron Wright Cloud Engineering Lead > > > > > > p: +1 425 922 8617 <18163542939> > > > > > > streamnative.io | Meet with me > > > <https://calendly.com/eronwright/regular-1-hour> > > > > > > <https://github.com/streamnative> > > > <https://www.linkedin.com/company/streamnative/> > > > <https://twitter.com/streamnativeio/> > > > > > > > > > -- > > > > Eron Wright Cloud Engineering Lead > > > > p: +1 425 922 8617 <18163542939> > > > > streamnative.io | Meet with me > > <https://calendly.com/eronwright/regular-1-hour> > > > > <https://github.com/streamnative> > > <https://www.linkedin.com/company/streamnative/> > > <https://twitter.com/streamnativeio/> > > > -- Eron Wright Cloud Engineering Lead p: +1 425 922 8617 <18163542939> streamnative.io | Meet with me <https://calendly.com/eronwright/regular-1-hour> <https://github.com/streamnative> <https://www.linkedin.com/company/streamnative/> <https://twitter.com/streamnativeio/>