I think the rationale for end-to-end idleness (i.e. between pipelines) is the same as the rationale for idleness between operators within a pipeline. On the 'main issue' you mentioned, we entrust the source with adapting to Flink's notion of idleness (e.g. in Pulsar source, it means that no topics/partitions are assigned to a given sub-task); a similar adaption would occur in the sink. In other words, I think it reasonable that a sink for a watermark-aware storage system has need for the idleness signal.
Let me explain how I would use it in Pulsar's sink. Each sub-task is a Pulsar producer, and is writing watermarks to a configured topic via the Producer API. The Pulsar broker aggregates the watermarks that are written by each producer into a global minimum (similar to StatusWatermarkValve). The broker keeps track of which producers are actively producing watermarks, and a producer may mark itself as idle to tell the broker not to wait for watermarks from it, e.g. when a producer is going offline. I had intended to mark the producer as idle when the sub-task is closing, but now I see that it would be insufficient; the producer should also be idled if the sub-task is idled. Otherwise, the broker would wait indefinitely for the idled sub-task to produce a watermark. Arvid, I think your original instincts were correct about idleness propagation, and I hope I've demonstrated a practical use case. On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise <ar...@apache.org> wrote: > When I was rethinking the idleness issue, I came to the conclusion that it > should be inferred at the source of the respective downstream pipeline > again. > > The main issue on propagating idleness is that you would force the same > definition across all downstream pipelines, which may not be what the user > intended. > On the other hand, I don't immediately see a technical reason why the > downstream source wouldn't be able to infer that. > > > On Thu, Jun 3, 2021 at 9:14 PM Eron Wright <ewri...@streamnative.io > .invalid> > wrote: > > > Thanks Piotr for bringing this up. I reflected on this and I agree we > > should expose idleness, otherwise a multi-stage flow could stall. > > > > Regarding the latency markers, I don't see an immediate need for > > propagating them, because they serve to estimate latency within a > pipeline, > > not across pipelines. One would probably need to enhance the source > > interface also to do e2e latency. Seems we agree this aspect is out of > > scope. > > > > I took a look at the code to get a sense of how to accomplish this. The > > gist is a new `markIdle` method on the `StreamOperator` interface, that > is > > called when the stream status maintainer (the `OperatorChain`) > transitions > > to idle state. Then, a new `markIdle` method on the `SinkFunction` and > > `SinkWriter` that is called by the respective operators. Note that > > StreamStatus is an internal class. > > > > Here's a draft PR (based on the existing PR of FLINK-22700) to highlight > > this new aspect: > > https://github.com/streamnative/flink/pull/2/files > > > > Please let me know if you'd like me to proceed to update the FLIP with > > these details. > > > > Thanks again, > > Eron > > > > On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > > > Hi, > > > > > > Sorry for chipping in late in the discussion, but I would second this > > point > > > from Arvid: > > > > > > > 4. Potentially, StreamStatus and LatencyMarker would also need to be > > > encoded. > > > > > > It seems like this point was asked, but not followed? Or did I miss it? > > > Especially the StreamStatus part. For me it sounds like exposing > > watermarks > > > without letting the sink know that the stream can be idle is an > > incomplete > > > feature and can be very problematic/confusing for potential users. > > > > > > Best, > > > Piotrek > > > > > > pon., 31 maj 2021 o 08:34 Arvid Heise <ar...@apache.org> napisał(a): > > > > > > > Afaik everyone can start a [VOTE] thread [1]. For example, here a > > > > non-committer started a successful thread [2]. > > > > If you start it, I can already cast a binding vote and we just need 2 > > > more > > > > for the FLIP to be accepted. > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting > > > > [2] > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html > > > > > > > > On Fri, May 28, 2021 at 8:17 PM Eron Wright <ewri...@streamnative.io > > > > .invalid> > > > > wrote: > > > > > > > > > Arvid, > > > > > Thanks for the feedback. I investigated the japicmp configuration, > > > and I > > > > > see that SinkWriter is marked Experimental (not Public or > > > > PublicEvolving). > > > > > I think this means that SinkWriter need not be excluded. As you > > > > mentioned, > > > > > SinkFunction is already excluded. I've updated the FLIP with an > > > > > explanation. > > > > > > > > > > I believe all issues are resolved. May we proceed to a vote now? > > And > > > > are > > > > > you able to drive the vote process? > > > > > > > > > > Thanks, > > > > > Eron > > > > > > > > > > > > > > > On Fri, May 28, 2021 at 4:40 AM Arvid Heise <ar...@apache.org> > > wrote: > > > > > > > > > > > Hi Eron, > > > > > > > > > > > > 1. fair point. It still feels odd to have writeWatermark in the > > > > > > SinkFunction (it's supposed to be functional as you mentioned), > > but I > > > > > agree > > > > > > that invokeWatermark is not better. So unless someone has a > better > > > > idea, > > > > > > I'm fine with it. > > > > > > 2.+3. I tried to come up with scenarios for a longer time. In > > > general, > > > > it > > > > > > seems as if the new SinkWriter interface encourages more > injection > > > (see > > > > > > processing time service in InitContext), such that the need for > the > > > > > context > > > > > > is really just context information of that particular record and > I > > > > don't > > > > > > see any use beyond timestamp and watermark. For SinkFunction, I'd > > not > > > > > > over-engineer as it's going to be deprecated soonish. So +1 to > > leave > > > it > > > > > > out. > > > > > > 4. Okay so I double-checked: from an execution perspective, it > > works. > > > > > > However, japicmp would definitely complain. I propose to add it > to > > > the > > > > > > compatibility section like this. We need to add an exception to > > > > > SinkWriter > > > > > > then. (SinkFunction is already on the exception list) > > > > > > 5.+6. Awesome, I was also sure but wanted to double check. > > > > > > > > > > > > Best, > > > > > > > > > > > > Arvid > > > > > > > > > > > > > > > > > > On Wed, May 26, 2021 at 7:29 PM Eron Wright < > > ewri...@streamnative.io > > > > > > .invalid> > > > > > > wrote: > > > > > > > > > > > > > Arvid, > > > > > > > > > > > > > > 1. I assume that the method name `invoke` stems from > considering > > > the > > > > > > > SinkFunction to be a functional interface, but is otherwise > > > > > meaningless. > > > > > > > Keeping it as `writeWatermark` does keep it symmetric with > > > > SinkWriter. > > > > > > My > > > > > > > vote is to leave it. You decide. > > > > > > > > > > > > > > 2+3. I too considered adding a `WatermarkContext`, but it would > > > > merely > > > > > > be a > > > > > > > placeholder. I don't anticipate any context info in future. > As > > we > > > > see > > > > > > > with invoke, it is possible to add a context later in a > > > > > > > backwards-compatible way. My vote is to not introduce a > context. > > > > You > > > > > > > decide. > > > > > > > > > > > > > > 4. No anticipated compatibility issues. > > > > > > > > > > > > > > 5. Short answer, it works as expected. The new methods are > > invoked > > > > > > > whenever the underlying operator receives a watermark. I do > > > believe > > > > > that > > > > > > > batch and ingestion time applications receive watermarks. Seems > > the > > > > > > > programming model is more unified in that respect since 1.12 > > > > > (FLIP-134). > > > > > > > > > > > > > > 6. The failure behavior is the same as for elements. > > > > > > > > > > > > > > Thanks, > > > > > > > Eron > > > > > > > > > > > > > > On Tue, May 25, 2021 at 12:42 PM Arvid Heise <ar...@apache.org > > > > > > wrote: > > > > > > > > > > > > > > > Hi Eron, > > > > > > > > > > > > > > > > I think the FLIP is crisp and mostly good to go. Some smaller > > > > > > > > things/questions: > > > > > > > > > > > > > > > > 1. SinkFunction#writeWatermark could be named > > > > > > > > SinkFunction#invokeWatermark or invokeOnWatermark to keep > it > > > > > > > symmetric. > > > > > > > > 2. We could add the context parameter to both. For > > > > > > SinkWriter#Context, > > > > > > > > we currently do not gain much. SinkFunction#Context also > > > exposes > > > > > > > > processing > > > > > > > > time, which may or may not be handy and is currently > mostly > > > used > > > > > for > > > > > > > > StreamingFileSink bucket policies. We may add that > > processing > > > > time > > > > > > > flag > > > > > > > > also to SinkWriter#Context in the future. > > > > > > > > 3. Alternatively, we could also add a different context > > > > parameter > > > > > > just > > > > > > > > to keep the API stable while allowing additional > information > > > to > > > > be > > > > > > > > passed > > > > > > > > in the future. > > > > > > > > 4. Would we run into any compatibility issue if we use > Flink > > > > 1.13 > > > > > > > source > > > > > > > > in Flink 1.14 (with this FLIP) or vice versa? > > > > > > > > 5. What happens with sinks that use the new methods in > > > > > applications > > > > > > > that > > > > > > > > do not have watermarks (batch mode, processing time)? Does > > > this > > > > > also > > > > > > > > work > > > > > > > > with ingestion time sufficiently? > > > > > > > > 6. How do exactly once sinks deal with written watermarks > in > > > > case > > > > > of > > > > > > > > failure? I guess it's the same as normal records. (Either > > > > rollback > > > > > > of > > > > > > > > transaction or deduplication on resumption) > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Arvid > > > > > > > > > > > > > > > > On Tue, May 25, 2021 at 6:44 PM Eron Wright < > > > > ewri...@streamnative.io > > > > > > > > .invalid> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Does anyone have further comment on FLIP-167? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Eron > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 20, 2021 at 5:02 PM Eron Wright < > > > > > ewri...@streamnative.io > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Filed FLIP-167: Watermarks for Sink API: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API > > > > > > > > > > > > > > > > > > > > I'd like to call a vote next week, is that reasonable? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian < > > b.z...@dell.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> Hi Arvid and Eron, > > > > > > > > > >> > > > > > > > > > >> Thanks for the discussion and I read through Eron's pull > > > > request > > > > > > > and I > > > > > > > > > >> think this can benefit Pravega Flink connector as well. > > > > > > > > > >> > > > > > > > > > >> Here is some background. Pravega had the watermark > concept > > > > > through > > > > > > > the > > > > > > > > > >> event stream since two years ago, and here is a blog > > > > > > introduction[1] > > > > > > > > for > > > > > > > > > >> Pravega watermark. > > > > > > > > > >> Pravega Flink connector also had this watermark > > integration > > > > last > > > > > > > year > > > > > > > > > >> that we wanted to propagate the Flink watermark to > Pravega > > > in > > > > > the > > > > > > > > > >> SinkFunction, and at that time we just used the existing > > > Flink > > > > > API > > > > > > > > that > > > > > > > > > we > > > > > > > > > >> keep the last watermark in memory and check if watermark > > > > changes > > > > > > for > > > > > > > > > each > > > > > > > > > >> event[2] which is not efficient. With such new > interface, > > we > > > > can > > > > > > > also > > > > > > > > > >> manage the watermark propagation much more easily. > > > > > > > > > >> > > > > > > > > > >> [1] > > > > > > > > https://pravega.io/blog/2019/11/08/pravega-watermarking-support/ > > > > > > > > > >> [2] > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465 > > > > > > > > > >> > > > > > > > > > >> -----Original Message----- > > > > > > > > > >> From: Arvid Heise <ar...@apache.org> > > > > > > > > > >> Sent: Wednesday, May 19, 2021 16:06 > > > > > > > > > >> To: dev > > > > > > > > > >> Subject: Re: [DISCUSS] Watermark propagation with Sink > API > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> [EXTERNAL EMAIL] > > > > > > > > > >> > > > > > > > > > >> Hi Eron, > > > > > > > > > >> > > > > > > > > > >> Thanks for pushing that topic. I can now see that the > > > benefit > > > > is > > > > > > > even > > > > > > > > > >> bigger than I initially thought. So it's worthwhile > > anyways > > > to > > > > > > > include > > > > > > > > > that. > > > > > > > > > >> > > > > > > > > > >> I also briefly thought about exposing watermarks to all > > > UDFs, > > > > > but > > > > > > > > here I > > > > > > > > > >> really have an issue to see specific use cases. Could > you > > > > maybe > > > > > > > take a > > > > > > > > > few > > > > > > > > > >> minutes to think about it as well? I could only see > > someone > > > > > > misusing > > > > > > > > > Async > > > > > > > > > >> IO as a sink where a real sink would be more > appropriate. > > In > > > > > > > general, > > > > > > > > if > > > > > > > > > >> there is not a clear use case, we shouldn't add the > > > > > functionality > > > > > > as > > > > > > > > > it's > > > > > > > > > >> just increased maintenance for no value. > > > > > > > > > >> > > > > > > > > > >> If we stick to the plan, I think your PR is already in a > > > good > > > > > > shape. > > > > > > > > We > > > > > > > > > >> need to create a FLIP for it though, since it changes > > Public > > > > > > > > interfaces > > > > > > > > > >> [1]. I was initially not convinced that we should also > > > change > > > > > the > > > > > > > old > > > > > > > > > >> SinkFunction interface, but seeing how little the change > > > is, I > > > > > > > > wouldn't > > > > > > > > > >> mind at all to increase consistency. Only when we wrote > > the > > > > FLIP > > > > > > and > > > > > > > > > >> approved it (which should be minimal and fast), we > should > > > > > actually > > > > > > > > look > > > > > > > > > at > > > > > > > > > >> the PR ;). > > > > > > > > > >> > > > > > > > > > >> The only thing which I would improve is the name of the > > > > > function. > > > > > > > > > >> processWatermark sounds as if the sink implementer > really > > > > needs > > > > > to > > > > > > > > > >> implement it (as you would need to do it on a custom > > > > operator). > > > > > I > > > > > > > > would > > > > > > > > > >> make them symmetric to the record writing/invoking > method > > > > (e.g. > > > > > > > > > >> writeWatermark and invokeWatermark). > > > > > > > > > >> > > > > > > > > > >> As a follow-up PR, we should then migrate KafkaShuffle > to > > > the > > > > > new > > > > > > > API. > > > > > > > > > >> But that's something I can do. > > > > > > > > > >> > > > > > > > > > >> [1] > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$ > > > > > > > > > >> [cwiki[.]apache[.]org] > > > > > > > > > >> > > > > > > > > > >> On Wed, May 19, 2021 at 3:34 AM Eron Wright < > > > > > > > ewri...@streamnative.io > > > > > > > > > >> .invalid> > > > > > > > > > >> wrote: > > > > > > > > > >> > > > > > > > > > >> > Update: opened an issue and a PR. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN > > > > > > > > > >> > > > > > > > > > > > > > > > > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM > > > > > > > > > >> > plbgRO4$ [issues[.]apache[.]org] > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950 > > > > > > > > > >> > > > > > > > > > > > > > > > > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a > > > > > > > > > >> > $ [github[.]com] > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright < > > > > > > > > ewri...@streamnative.io > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > >> > > > > > > > > > > >> > > Thanks Arvid and David for sharing your ideas on > this > > > > > subject. > > > > > > > > I'm > > > > > > > > > >> > > glad to hear that you're seeing use cases for > > watermark > > > > > > > > propagation > > > > > > > > > >> > > via an enhanced sink interface. > > > > > > > > > >> > > > > > > > > > > > >> > > As you've guessed, my interest is in Pulsar and am > > > > exploring > > > > > > > some > > > > > > > > > >> > > options for brokering watermarks across stream > > > processing > > > > > > > > pipelines. > > > > > > > > > >> > > I think > > > > > > > > > >> > Arvid > > > > > > > > > >> > > is speaking to a high-fidelity solution where the > > > > difference > > > > > > > > between > > > > > > > > > >> > intra- > > > > > > > > > >> > > and inter-pipeline flow is eliminated. My goal is > > more > > > > > > > limited; I > > > > > > > > > >> > > want > > > > > > > > > >> > to > > > > > > > > > >> > > write the watermark that arrives at the sink to > > Pulsar. > > > > > > Simply > > > > > > > > > >> > > imagine that Pulsar has native support for > > watermarking > > > in > > > > > its > > > > > > > > > >> > > producer/consumer API, and we'll leave the details > to > > > > > another > > > > > > > > forum. > > > > > > > > > >> > > > > > > > > > > > >> > > David, I like your invariant. I see lateness as > > > stemming > > > > > from > > > > > > > the > > > > > > > > > >> > problem > > > > > > > > > >> > > domain and from system dynamics (e.g. scheduling, > > > > batching, > > > > > > > lag). > > > > > > > > > >> > > When > > > > > > > > > >> > one > > > > > > > > > >> > > depends on order-of-observation to generate > > watermarks, > > > > the > > > > > > app > > > > > > > > may > > > > > > > > > >> > become > > > > > > > > > >> > > unduly sensitive to dynamics which bear on > > > > > > order-of-observation. > > > > > > > > My > > > > > > > > > >> > > goal is to factor out the system dynamics from > > lateness > > > > > > > > > determination. > > > > > > > > > >> > > > > > > > > > > > >> > > Arvid, to be most valuable (at least for my > purposes) > > > the > > > > > > > > > >> > > enhancement is needed on SinkFunction. This will > > allow > > > us > > > > > to > > > > > > > > easily > > > > > > > > > >> > > evolve the existing Pulsar connector. > > > > > > > > > >> > > > > > > > > > > > >> > > Next step, I will open a PR to advance the > > conversation. > > > > > > > > > >> > > > > > > > > > > > >> > > Eron > > > > > > > > > >> > > > > > > > > > > > >> > > On Tue, May 18, 2021 at 5:06 AM David Morávek > > > > > > > > > >> > > <david.mora...@gmail.com> > > > > > > > > > >> > > wrote: > > > > > > > > > >> > > > > > > > > > > > >> > >> Hi Eron, > > > > > > > > > >> > >> > > > > > > > > > >> > >> Thanks for starting this discussion. I've been > > thinking > > > > > about > > > > > > > > this > > > > > > > > > >> > >> recently as we've run into "watermark related" > > issues, > > > > when > > > > > > > > > >> > >> chaining multiple pipelines together. My to cents > to > > > the > > > > > > > > > >> > >> discussion: > > > > > > > > > >> > >> > > > > > > > > > >> > >> How I like to think about the problem, is that > there > > > > should > > > > > > an > > > > > > > > > >> > >> invariant that holds for any stream processing > > > pipeline: > > > > > > > > "NON_LATE > > > > > > > > > >> > >> element > > > > > > > > > >> > entering > > > > > > > > > >> > >> the system, should never become LATE" > > > > > > > > > >> > >> > > > > > > > > > >> > >> Unfortunately this is exactly what happens in > > > downstream > > > > > > > > pipelines, > > > > > > > > > >> > >> because the upstream one can: > > > > > > > > > >> > >> - break ordering (especially with higher > parallelism) > > > > > > > > > >> > >> - emit elements that are ahead of output watermark > > > > > > > > > >> > >> > > > > > > > > > >> > >> There is not enough information to re-construct > > > upstream > > > > > > > > watermark > > > > > > > > > >> > >> in latter stages (it's always just an estimate > based > > on > > > > > > > previous > > > > > > > > > >> > >> pipeline's output). > > > > > > > > > >> > >> > > > > > > > > > >> > >> It would be great, if we could have a general > > > > abstraction, > > > > > > that > > > > > > > > is > > > > > > > > > >> > >> reusable for various sources / sinks (not just > Kafka > > / > > > > > > Pulsar, > > > > > > > > > >> > >> thought this would probably cover most of the > > > use-cases) > > > > > and > > > > > > > > > >> > >> systems. > > > > > > > > > >> > >> > > > > > > > > > >> > >> Is there any other use-case then sharing watermark > > > > between > > > > > > > > > >> > >> pipelines, > > > > > > > > > >> > that > > > > > > > > > >> > >> you're trying to solve? > > > > > > > > > >> > >> > > > > > > > > > >> > >> Arvid: > > > > > > > > > >> > >> > > > > > > > > > >> > >> 1. Watermarks are closely coupled to the used > system > > > > > > (=Flink). > > > > > > > I > > > > > > > > > >> > >> have a > > > > > > > > > >> > >> > hard time imagining that it's useful to use a > > > different > > > > > > > stream > > > > > > > > > >> > processor > > > > > > > > > >> > >> > downstream. So for now, I'm assuming that both > > > upstream > > > > > and > > > > > > > > > >> > >> > downstream > > > > > > > > > >> > >> are > > > > > > > > > >> > >> > Flink applications. In that case, we probably > > define > > > > both > > > > > > > parts > > > > > > > > > >> > >> > of the pipeline in the same Flink job similar to > > > > > > > KafkaStream's > > > > > > > > > >> #through. > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > >> > >> I'd slightly disagree here. For example we're > > > > > "materializing" > > > > > > > > > >> > change-logs > > > > > > > > > >> > >> produced by Flink pipeline into serving layer > (random > > > > > access > > > > > > > db / > > > > > > > > > >> > >> in memory view / ..) and we need to know, whether > > > > responses > > > > > > we > > > > > > > > > >> > >> serve meet the "freshness" requirements (eg. you > may > > > want > > > > > to > > > > > > > > > >> > >> respond differently, when watermark is lagging way > > too > > > > much > > > > > > > > behind > > > > > > > > > >> > >> processing time). Also not > > > > > > > > > >> > every > > > > > > > > > >> > >> stream processor in the pipeline needs to be Flink. > > It > > > > can > > > > > as > > > > > > > > well > > > > > > > > > >> > >> be a simple element-wise transformation that reads > > from > > > > > Kafka > > > > > > > and > > > > > > > > > >> > >> writes back into separate topic (that's what we do > > for > > > > > > example > > > > > > > > with > > > > > > > > > >> > >> ML models, that have special hardware > requirements). > > > > > > > > > >> > >> > > > > > > > > > >> > >> Best, > > > > > > > > > >> > >> D. > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > > >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise < > > > > > > ar...@apache.org> > > > > > > > > > >> wrote: > > > > > > > > > >> > >> > > > > > > > > > >> > >> > Hi Eron, > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > I think this is a useful addition for storage > > systems > > > > > that > > > > > > > act > > > > > > > > as > > > > > > > > > >> > >> > pass-through for Flink to reduce recovery time. > It > > is > > > > > only > > > > > > > > useful > > > > > > > > > >> > >> > if > > > > > > > > > >> > you > > > > > > > > > >> > >> > combine it with regional fail-over as only a > small > > > part > > > > > of > > > > > > > the > > > > > > > > > >> > pipeline > > > > > > > > > >> > >> is > > > > > > > > > >> > >> > restarted. > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > A couple of thoughts on the implications: > > > > > > > > > >> > >> > 1. Watermarks are closely coupled to the used > > system > > > > > > > (=Flink). > > > > > > > > I > > > > > > > > > >> > >> > have > > > > > > > > > >> > a > > > > > > > > > >> > >> > hard time imagining that it's useful to use a > > > different > > > > > > > stream > > > > > > > > > >> > processor > > > > > > > > > >> > >> > downstream. So for now, I'm assuming that both > > > upstream > > > > > and > > > > > > > > > >> > >> > downstream > > > > > > > > > >> > >> are > > > > > > > > > >> > >> > Flink applications. In that case, we probably > > define > > > > both > > > > > > > parts > > > > > > > > > >> > >> > of the pipeline in the same Flink job similar to > > > > > > > KafkaStream's > > > > > > > > > >> #through. > > > > > > > > > >> > >> > 2. The schema of the respective intermediate > > > > stream/topic > > > > > > > would > > > > > > > > > >> > >> > need > > > > > > > > > >> > to > > > > > > > > > >> > >> be > > > > > > > > > >> > >> > managed by Flink to encode both records and > > > watermarks. > > > > > > This > > > > > > > > > >> > >> > reduces > > > > > > > > > >> > the > > > > > > > > > >> > >> > usability quite a bit and needs to be carefully > > > > crafted. > > > > > > > > > >> > >> > 3. It's not clear to me if constructs like > > > > SchemaRegistry > > > > > > can > > > > > > > > be > > > > > > > > > >> > >> properly > > > > > > > > > >> > >> > supported (and also if they should be supported) > in > > > > terms > > > > > > of > > > > > > > > > >> > >> > schema evolution. > > > > > > > > > >> > >> > 4. Potentially, StreamStatus and LatencyMarker > > would > > > > also > > > > > > > need > > > > > > > > to > > > > > > > > > >> > >> > be encoded. > > > > > > > > > >> > >> > 5. It's important to have some way to transport > > > > > > backpressure > > > > > > > > from > > > > > > > > > >> > >> > the downstream to the upstream. Or else you would > > > have > > > > > the > > > > > > > same > > > > > > > > > >> > >> > issue as KafkaStreams where two separate > pipelines > > > can > > > > > > drift > > > > > > > so > > > > > > > > > >> > >> > far away that > > > > > > > > > >> > you > > > > > > > > > >> > >> > experience data loss if the data retention period > > is > > > > > > smaller > > > > > > > > than > > > > > > > > > >> > >> > the drift. > > > > > > > > > >> > >> > 6. It's clear that you trade a huge chunk of > > > throughput > > > > > for > > > > > > > > lower > > > > > > > > > >> > >> overall > > > > > > > > > >> > >> > latency in case of failure. So it's an > interesting > > > > > feature > > > > > > > for > > > > > > > > > >> > >> > use > > > > > > > > > >> > cases > > > > > > > > > >> > >> > with SLAs. > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > Since we are phasing out SinkFunction, I'd prefer > > to > > > > only > > > > > > > > support > > > > > > > > > >> > >> > SinkWriter. Having a no-op default sounds good to > > me. > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > We have some experimental feature for Kafka [1], > > > which > > > > > > pretty > > > > > > > > > >> > >> > much > > > > > > > > > >> > >> reflects > > > > > > > > > >> > >> > your idea. Here we have an ugly workaround to be > > able > > > > to > > > > > > > > process > > > > > > > > > >> > >> > the watermark by using a custom StreamSink task. > We > > > > could > > > > > > > also > > > > > > > > > >> > >> > try to > > > > > > > > > >> > >> create a > > > > > > > > > >> > >> > FLIP that abstracts the actual system away and > then > > > we > > > > > > could > > > > > > > > use > > > > > > > > > >> > >> > the approach for both Pulsar and Kafka. > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > [1] > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste > > > > > > > > > >> > > > > > > > > > > > > > > > > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin > > > > > > > > > >> > > > > > > > > > > > > > > > > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw! > > > > > > > > > >> > > > > > > > > > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$ > > > > > > > > > >> > [github[.]com] > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright > > > > > > > > > >> > >> > <ewri...@streamnative.io.invalid> wrote: > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > I would like to propose an enhancement to the > > Sink > > > > API, > > > > > > the > > > > > > > > > >> > >> > > ability > > > > > > > > > >> > to > > > > > > > > > >> > >> > > receive upstream watermarks. I'm aware that > the > > > > sink > > > > > > > > context > > > > > > > > > >> > >> provides > > > > > > > > > >> > >> > the > > > > > > > > > >> > >> > > current watermark for a given record. I'd like > > to > > > be > > > > > > able > > > > > > > to > > > > > > > > > >> > >> > > write > > > > > > > > > >> > a > > > > > > > > > >> > >> > sink > > > > > > > > > >> > >> > > function that is invoked whenever the watermark > > > > > changes. > > > > > > > Out > > > > > > > > > >> > >> > > of > > > > > > > > > >> > scope > > > > > > > > > >> > >> > > would be event-time timers (since sinks aren't > > > > keyed). > > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > For context, imagine that a stream storage > system > > > had > > > > > the > > > > > > > > > >> > >> > > ability to persist watermarks in addition to > > > ordinary > > > > > > > > elements, > > > > > > > > > >> > >> > > e.g. to serve > > > > > > > > > >> > as > > > > > > > > > >> > >> > > source watermarks in a downstream processor. > > > Ideally > > > > > one > > > > > > > > could > > > > > > > > > >> > >> compose a > > > > > > > > > >> > >> > > multi-stage, event-driven application, with > > > > watermarks > > > > > > > > flowing > > > > > > > > > >> > >> end-to-end > > > > > > > > > >> > >> > > without need for a heuristics-based watermark > at > > > each > > > > > > > stage. > > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > The specific proposal would be a new method on > > > > > > > `SinkFunction` > > > > > > > > > >> > >> > > and/or > > > > > > > > > >> > >> on > > > > > > > > > >> > >> > > `SinkWriter`, called 'processWatermark' or > > > > > > > 'writeWatermark', > > > > > > > > > >> > >> > > with a > > > > > > > > > >> > >> > default > > > > > > > > > >> > >> > > implementation that does nothing. > > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > Thoughts? > > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > Thanks! > > > > > > > > > >> > >> > > Eron Wright > > > > > > > > > >> > >> > > StreamNative > > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > -- > > > > > > > > > >> > > > > > > > > > > > >> > > Eron Wright Cloud Engineering Lead > > > > > > > > > >> > > > > > > > > > > > >> > > p: +1 425 922 8617 <18163542939> > > > > > > > > > >> > > > > > > > > > > > >> > > streamnative.io | Meet with me > > > > > > > > > >> > > < > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular > > > > > > > > > >> > > > > > > > > > > > > > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 > > > > > > > > > >> > > dMtQrD25c$ [calendly[.]com]> > > > > > > > > > >> > > > > > > > > > > > >> > > < > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK > > > > > > > > > >> > > > > > > > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ > > > > > > > > > >> > > [github[.]com]> > > > > > > > > > >> > > < > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://www.linkedin.com/company/stream > > > > > > > > > >> > > > > > > > > > > > > > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 > > > > > > > > > >> > > dMqO4UZJa$ [linkedin[.]com]> > > > > > > > > > >> > > < > > > > > > > > > > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__ > > > > > > > > > ;! > > > > > > > > > >> > > > > > > > > > > > > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ > > > > > > > > > >> > > [twitter[.]com]> > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > -- > > > > > > > > > >> > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > >> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > >> > > > > > > > > > > >> > streamnative.io | Meet with me > > > > > > > > > >> > < > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1 > > > > > > > > > >> > > > > > > > > > > > > > > > > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ > > > > > > > > > >> > rD25c$ [calendly[.]com]> > > > > > > > > > >> > > > > > > > > > > >> > < > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI > > > > > > > > > ! > > > > > > > > > >> > > > > > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ > > > > > > > > > >> > [github[.]com]> > > > > > > > > > >> > < > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://www.linkedin.com/company/streamna > > > > > > > > > >> > > > > > > > > > > > > > > > > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO > > > > > > > > > >> > 4UZJa$ [linkedin[.]com]> > > > > > > > > > >> > < > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L > > > > > > > > > >> > > > > > > > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ > > > > > > > > > >> > [twitter[.]com]> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > Eron Wright Cloud Engineering Lead > > > > > > > > > > > > > > > > > > > > p: +1 425 922 8617 <18163542939> > > > > > > > > > > > > > > > > > > > > streamnative.io | Meet with me > > > > > > > > > > <https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > > > > > > > > > > > <https://github.com/streamnative> > > > > > > > > > > <https://www.linkedin.com/company/streamnative/> > > > > > > > > > > <https://twitter.com/streamnativeio/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Eron Wright Cloud Engineering Lead > > > > > > > > > > > > > > > > > > p: +1 425 922 8617 <18163542939> > > > > > > > > > > > > > > > > > > streamnative.io | Meet with me > > > > > > > > > <https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > > > > > > > > > <https://github.com/streamnative> > > > > > > > > > <https://www.linkedin.com/company/streamnative/> > > > > > > > > > <https://twitter.com/streamnativeio/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Eron Wright Cloud Engineering Lead > > > > > > > > > > > > > > p: +1 425 922 8617 <18163542939> > > > > > > > > > > > > > > streamnative.io | Meet with me > > > > > > > <https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > > > > > <https://github.com/streamnative> > > > > > > > <https://www.linkedin.com/company/streamnative/> > > > > > > > <https://twitter.com/streamnativeio/> > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Eron Wright Cloud Engineering Lead > > > > > > > > > > p: +1 425 922 8617 <18163542939> > > > > > > > > > > streamnative.io | Meet with me > > > > > <https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > <https://github.com/streamnative> > > > > > <https://www.linkedin.com/company/streamnative/> > > > > > <https://twitter.com/streamnativeio/> > > > > > > > > > > > > > > > > > > -- > > > > Eron Wright Cloud Engineering Lead > > > > p: +1 425 922 8617 <18163542939> > > > > streamnative.io | Meet with me > > <https://calendly.com/eronwright/regular-1-hour> > > > > <https://github.com/streamnative> > > <https://www.linkedin.com/company/streamnative/> > > <https://twitter.com/streamnativeio/> > > > -- Eron Wright Cloud Engineering Lead p: +1 425 922 8617 <18163542939> streamnative.io | Meet with me <https://calendly.com/eronwright/regular-1-hour> <https://github.com/streamnative> <https://www.linkedin.com/company/streamnative/> <https://twitter.com/streamnativeio/>