Thanks, the narrowed FLIP-167 is fine for now. I'll re-activate the vote process. Thanks!
On Tue, Jun 8, 2021 at 3:01 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi everyone, > > I do agree that Flink's definition of idleness is not fully thought through > yet. Consequently, I would feel a bit uneasy to make it part of Flink's API > right now. Instead, defining the proper semantics first and then exposing > it sounds like a good approach forward. Hence, +1 for option number 1, > which will also allow FLIP-167 to make progress. > > Concerning subtasks with no partitions assigned, would it make sense to > terminate these tasks at some point? That way, the stream would be closed > and there is no need to maintain a stream status. Of course, this also > requires at some point that Flink can start new sources when new partitions > appear. > > Cheers, > Till > > On Tue, Jun 8, 2021 at 9:26 AM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > > > Hi Eron, > > > > The FLIP-167 is narrow, but we recently discovered some problems with > > current idleness semantics as Arvid explained. We are planning to > present a > > new proposal to redefine them. Probably as a part of it, we would need to > > rename them. Given that, I think it doesn't make sense to expose idleness > > to the sinks before we rename and define it properly. In other words: > > > > > 2. When the sink operator is idled, tell the sink function. > > > > We shouldn't expose stream status as a part of public API until it's > > properly defined. > > > > I would propose one of the two things: > > 1. Proceed with FLIP-167, without exposing idleness in the sinks YET. > > Exposing idleness could be part of this next/future FLIP that would > define > > idleness in the first place. > > 2. Block FLIP-167, until the idleness is fixed. > > > > I would vote for option number 1. > > > > Piotrek > > > > pon., 7 cze 2021 o 18:08 Eron Wright <ewri...@streamnative.io.invalid> > > napisał(a): > > > > > Piotr, David, and Arvid, we've had an expansive discussion but > ultimately > > > the proposal is narrow. It is: > > > 1. When a watermark arrives at the sink operator, tell the sink > function. > > > 2. When the sink operator is idled, tell the sink function. > > > > > > With these enhancements, we will significantly improve correctness in > > > multi-stage flows, and facilitate an exciting project in the Pulsar > > > community. Would you please lend your support to FLIP-167 so that we > can > > > land this enhancement for 1.14? My deepest thanks! > > > > > > -Eron > > > > > > > > > > > > > > > On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise <ar...@apache.org> wrote: > > > > > > > Hi Eron, > > > > > > > > you either have very specific use cases in mind or have a > misconception > > > > about idleness in Flink with the new sources. The basic idea is that > > you > > > > have watermark generators only at the sources and the user supplies > > them. > > > > As a source author, you have no option to limit that. Here a bit of > > > > background: > > > > > > > > We observed that many users that read from Kafka were confused about > no > > > > visible progress in their Flink applications because of some idle > > > partition > > > > and we introduced idleness subsequently. Idleness was always > considered > > > as > > > > a means to achieve progress at the risk of losing a bit of > correctness. > > > > So especially in the case that you describe with a Pulsar partition > > that > > > is > > > > empty but indefinitely active, the user needs to be able to use > > idleness > > > > such that downstream window operators progress. > > > > > > > > I hope to have clarified that "I wouldn't recommend using > > withIdleness() > > > > with source-based watermarks." would pretty much make the intended > use > > > case > > > > not work anymore. > > > > > > > > --- > > > > > > > > Nevertheless, from the discussion with you and some offline > discussion > > > with > > > > Piotr and Dawid, we actually found quite a bit of drawbacks from the > > > > current definition of idleness: > > > > - We currently only use idleness to exclude respective upstream tasks > > > from > > > > participating in watermark generation (as you have eloquently put > > further > > > > up in the thread). > > > > - However, the definition is bound to records. So while a partition > is > > > > idle, no records should be produced. > > > > - That brings us into quite a few edge cases, where operators emit > > > records, > > > > while they are actually idling: Think of timers, asyncIO operators, > > > window > > > > operators based on timeouts, etc. > > > > - The solution would be to turn the operator active while emitting > and > > > > returning to being idle afterwards (but when?). However, this has > some > > > > unintended side-effects depending on when you switch back. > > > > > > > > We are currently thinking that we should rephrase the definition to > > what > > > > you described: > > > > - A channel that is active is providing watermarks. > > > > - An idle channel is not providing any watermarks but can deliver > > > records. > > > > - Then we are not talking about idle partitions anymore but explicit > > and > > > > implicit watermark generation and should probably rename the > concepts. > > > > - This would probably mean that we also need an explicit markActive > in > > > > source/sink to express that the respective entity now needs to wait > for > > > > explicit watermarks. > > > > > > > > I'll open a proper discussion thread tomorrow. > > > > > > > > Note that we probably shouldn't rush this FLIP until we have > clarified > > > the > > > > semantics of idleness. We could also cut the scope of the FLIP to > > exclude > > > > idleness and go ahead without it (there should be enough binding > votes > > > > already). > > > > > > > > On Sat, Jun 5, 2021 at 12:09 AM Eron Wright <ewri...@streamnative.io > > > > .invalid> > > > > wrote: > > > > > > > > > I understand your scenario but I disagree with its assumptions: > > > > > > > > > > "However, the partition of A is empty and thus A is temporarily > > idle." > > > - > > > > > you're assuming that the behavior of the source is to mark itself > > idle > > > if > > > > > data isn't available, but that's clearly source-specific and not > > > behavior > > > > > we expect to have in Pulsar source. A partition may be empty > > > > indefinitely > > > > > while still being active. Imagine that the producer is defending a > > > > lease - > > > > > "I'm here, there's no data, please don't advance the clock". > > > > > > > > > > "we bind idleness to wall clock time" - you're characterizing a > > > specific > > > > > strategy (WatermarkStrategy.withIdleness()), not the inherent > > behavior > > > of > > > > > the pipeline. I wouldn't recommend using withIdleness() with > > > > source-based > > > > > watermarks. > > > > > > > > > > I do agree that dynamism in partition assignment can wreak havoc on > > > > > watermark correctness. We have some ideas on the Pulsar side about > > > that > > > > > too. I would ask that we focus on the Flink framework and pipeline > > > > > behavior. By offering a more powerful framework, we encourage > stream > > > > > storage systems to "rise to the occasion" - treat event time in a > > > > > first-class way, optimize for correctness, etc. In this case, > > FLIP-167 > > > > is > > > > > setting the stage for evolution in Pulsar. > > > > > > > > > > Thanks again Arvid for the great discussion. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise <ar...@apache.org> > wrote: > > > > > > > > > > > At least one big motivation is having (temporary) empty > partitions. > > > Let > > > > > me > > > > > > give you an example, why imho idleness is only approximate in > this > > > > case: > > > > > > Assume you have source subtask A, B, C that correspond to 3 > source > > > > > > partitions and a downstream keyed window operator W. > > > > > > > > > > > > W would usually trigger on min_watermark(A, B, C). However, the > > > > partition > > > > > > of A is empty and thus A is temporarily idle. So W triggers on > > > > > > min_watermark(B, C). When A is now active again, the watermark > > > > implicitly > > > > > > is min_watermark(B, C) for A! > > > > > > > > > > > > Let's further assume that the source is filled by another > pipeline > > > > > before. > > > > > > This pipeline experiences technical difficulties for X minutes > and > > > > could > > > > > > not produce into the partition of A, hence the idleness. When the > > > > > upstream > > > > > > pipeline resumes it fills A with some records that are before > > > > > > min_watermark(B, C). Any watermark generated from these records > is > > > > > > discarded as the watermark is monotonous. Therefore, these > records > > > will > > > > > be > > > > > > considered late by W and discarded. > > > > > > > > > > > > Without idleness, we would have simply bocked W until the > upstream > > > > > pipeline > > > > > > fully recovers and we would not have had any late records. The > same > > > > holds > > > > > > for any reprocessing where the data of partition A is continuous. > > > > > > > > > > > > If you look deeper, the issue is that we bind idleness to wall > > clock > > > > time > > > > > > (e.g. advance watermark after X seconds without data). Then we > > assume > > > > the > > > > > > watermark of the idle partition to be in sync with the slowest > > > > partition. > > > > > > However, in the case of hiccups, this assumption does not hold at > > > all. > > > > > > I don't see any fix for that (easy or not easy) and imho it's > > > inherent > > > > to > > > > > > the design of idleness. > > > > > > We lack information (why is no data coming) and have a heuristic > to > > > fix > > > > > it. > > > > > > > > > > > > In the case of partition assignment where one subtask has no > > > partition, > > > > > we > > > > > > are probably somewhat safe. We know why no data is coming (no > > > > partition) > > > > > > and as long as we do not have dynamic partition assignment, there > > > will > > > > > > never be a switch to active without restart (for the foreseeable > > > > future). > > > > > > > > > > > > On Fri, Jun 4, 2021 at 10:34 PM Eron Wright < > > ewri...@streamnative.io > > > > > > .invalid> > > > > > > wrote: > > > > > > > > > > > > > Yes I'm talking about an implementation of idleness that is > > > unrelated > > > > > to > > > > > > > processing time. The clear example is partition assignment to > > > > > subtasks, > > > > > > > which probably motivated Flink's idleness functionality in the > > > first > > > > > > place. > > > > > > > > > > > > > > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise <ar...@apache.org> > > > > wrote: > > > > > > > > > > > > > > > Hi Eron, > > > > > > > > > > > > > > > > Are you referring to an implementation of idleness that does > > not > > > > rely > > > > > > on > > > > > > > a > > > > > > > > wall clock but on some clock baked into the partition > > information > > > > of > > > > > > the > > > > > > > > source system? > > > > > > > > If so, you are right that it invalidates my points. > > > > > > > > Do you have an example on where this is used? > > > > > > > > > > > > > > > > With a wall clock, you always run into the issues that I > > describe > > > > > since > > > > > > > you > > > > > > > > are effectively mixing event time and processing time... > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright < > > > > ewri...@streamnative.io > > > > > > > > .invalid> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Dawid, I think you're mischaracterizing the idleness signal > > as > > > > > > > > inherently a > > > > > > > > > heuristic, but Flink does not impose that. A source-based > > > > > watermark > > > > > > > (and > > > > > > > > > corresponding idleness signal) may well be entirely > > > data-driven, > > > > > > > entirely > > > > > > > > > deterministic. Basically you're underselling what the > > pipeline > > > > is > > > > > > > > capable > > > > > > > > > of, based on painful experiences with using the generic, > > > > > > > heuristics-based > > > > > > > > > watermark assigner. Please don't let those experiences > > > > overshadow > > > > > > > what's > > > > > > > > > possible with source-based watermarking. > > > > > > > > > > > > > > > > > > The idleness signal does have a strict definition, it > > indicates > > > > > > whether > > > > > > > > the > > > > > > > > > stream is actively participating in advancing the event > time > > > > clock. > > > > > > > The > > > > > > > > > status of all participants is considered when aggregating > > > > > watermarks. > > > > > > > A > > > > > > > > > source subtask generally makes the determination based on > > data, > > > > > e.g. > > > > > > > > > whether a topic is assigned to that subtask. > > > > > > > > > > > > > > > > > > We have here a modest proposal to add callbacks to the sink > > > > > function > > > > > > > for > > > > > > > > > information that the sink operator already receives. The > > > > practical > > > > > > > > result > > > > > > > > > is improved correctness when used with streaming systems > that > > > > have > > > > > > > > > first-class support for event time. The specific changes > may > > > be > > > > > > > > previewed > > > > > > > > > here: > > > > > > > > > https://github.com/apache/flink/pull/15950 > > > > > > > > > https://github.com/streamnative/flink/pull/2 > > > > > > > > > > > > > > > > > > Thank you all for the robust discussion. Do I have your > > support > > > > to > > > > > > > > proceed > > > > > > > > > to enhance FLIP-167 with idleness callbacks and to proceed > > to a > > > > > vote? > > > > > > > > > > > > > > > > > > Eron > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise < > ar...@apache.org > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > While everything I wrote before is still valid, upon > > further > > > > > > > > rethinking, > > > > > > > > > I > > > > > > > > > > think that the conclusion is not necessarily correct: > > > > > > > > > > - If the user wants to have pipeline A and B behaving as > if > > > A+B > > > > > was > > > > > > > > > jointly > > > > > > > > > > executed in the same pipeline without the intermediate > > Pulsar > > > > > > topic, > > > > > > > > > having > > > > > > > > > > the idleness in that topic is to only way to guarantee > > > > > consistency. > > > > > > > > > > - We could support the following in the respective > sources: > > > If > > > > > the > > > > > > > user > > > > > > > > > > that wants to use a different definition of idleness in > B, > > > they > > > > > can > > > > > > > > just > > > > > > > > > > provide a new idleness definition. At that point, we > should > > > > > discard > > > > > > > the > > > > > > > > > > idleness in the intermediate topic while reading. > > > > > > > > > > > > > > > > > > > > If we would agree on the latter way, I think having the > > > > idleness > > > > > in > > > > > > > the > > > > > > > > > > topic is of great use because it's a piece of information > > > that > > > > > > cannot > > > > > > > > be > > > > > > > > > > inferred as stated by others. Consequently, we would be > > able > > > to > > > > > > > support > > > > > > > > > all > > > > > > > > > > use cases and can give the user the freedom to express > his > > > > > intent. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise < > > ar...@apache.org > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I think the core issue in this discussion is that we > kind > > > of > > > > > > assume > > > > > > > > > that > > > > > > > > > > > idleness is something universally well-defined. But > it's > > > not. > > > > > > It's > > > > > > > a > > > > > > > > > > > heuristic to advance data processing in event time > where > > we > > > > > would > > > > > > > > lack > > > > > > > > > > data > > > > > > > > > > > to do so otherwise. > > > > > > > > > > > Keep in mind that idleness has no real definition in > > terms > > > of > > > > > > event > > > > > > > > > time > > > > > > > > > > > and leads to severe unexpected results: If you > reprocess > > a > > > > data > > > > > > > > stream > > > > > > > > > > with > > > > > > > > > > > temporarily idle partitions, these partitions would not > > be > > > > > deemed > > > > > > > > idle > > > > > > > > > on > > > > > > > > > > > reprocessing and there is a realistic chance that > records > > > > that > > > > > > were > > > > > > > > > > deemed > > > > > > > > > > > late in the live processing case are now perfectly fine > > > > records > > > > > > in > > > > > > > > the > > > > > > > > > > > reprocessing case. (I can expand on that if that was > too > > > > short) > > > > > > > > > > > > > > > > > > > > > > With that in mind, why would a downstream process even > > try > > > to > > > > > > > > calculate > > > > > > > > > > > the same idleness state as the upstream process? I > don't > > > see > > > > a > > > > > > > point; > > > > > > > > > we > > > > > > > > > > > would just further any imprecision in the calculation. > > > > > > > > > > > > > > > > > > > > > > Let's have a concrete example. Assume that we have > > upstream > > > > > > > pipeline > > > > > > > > A > > > > > > > > > > and > > > > > > > > > > > downstream pipeline B. A has plenty of resources and is > > > live > > > > > > > > processing > > > > > > > > > > > data. Some partitions are idle and that is propagated > to > > > the > > > > > > sinks. > > > > > > > > > Now B > > > > > > > > > > > is heavily backpressured and consumes very slowly. B > > > doesn't > > > > > see > > > > > > > any > > > > > > > > > > > idleness directly. B can calculate exact watermarks and > > use > > > > all > > > > > > > > records > > > > > > > > > > for > > > > > > > > > > > it's calculation. Reprocessing would yield the same > > result > > > > for > > > > > B. > > > > > > > If > > > > > > > > we > > > > > > > > > > now > > > > > > > > > > > forward idleness, we can easily find cases where we > would > > > > > advance > > > > > > > the > > > > > > > > > > > watermark prematurely while there is data directly > > > available > > > > to > > > > > > > > > calculate > > > > > > > > > > > the exact watermark. > > > > > > > > > > > > > > > > > > > > > > For me, idleness is just a pipeline-specific heuristic > > and > > > > > should > > > > > > > be > > > > > > > > > > > viewed as such. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > Arvid > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski < > > > > > > > pnowoj...@apache.org> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> Hi, > > > > > > > > > > >> > > > > > > > > > > >> > Imagine you're starting consuming from the result > > > channel > > > > > in a > > > > > > > > > > situation > > > > > > > > > > >> were you have: > > > > > > > > > > >> > record4, record3, StreamStatus.ACTIVE, > > StreamStatus.IDLE > > > > > > > record2, > > > > > > > > > > >> record1, record0 > > > > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is > > > unnecessary, > > > > > and > > > > > > > > might > > > > > > > > > > >> cause the record3 and record4 to be late depending on > > how > > > > the > > > > > > > > > watermark > > > > > > > > > > >> progressed in other partitions. > > > > > > > > > > >> > > > > > > > > > > >> Yes, I understand this point. But it can also be the > > other > > > > way > > > > > > > > around. > > > > > > > > > > >> There might be a large gap between record2 and > record3, > > > and > > > > > > users > > > > > > > > > might > > > > > > > > > > >> prefer or might be not able to duplicate idleness > > > detection > > > > > > logic. > > > > > > > > The > > > > > > > > > > >> downstream system might be lacking some kind of > > > information > > > > > > (that > > > > > > > is > > > > > > > > > > only > > > > > > > > > > >> available in the top level/ingesting system) to > > correctly > > > > set > > > > > > the > > > > > > > > idle > > > > > > > > > > >> status. > > > > > > > > > > >> > > > > > > > > > > >> Piotrek > > > > > > > > > > >> > > > > > > > > > > >> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz < > > > > > > dwysakow...@apache.org> > > > > > > > > > > >> napisał(a): > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > Same as Eron I don't follow this point. Any > streaming > > > sink > > > > > can > > > > > > > be > > > > > > > > > used > > > > > > > > > > >> as > > > > > > > > > > >> > this kind of transient channel. Streaming sinks, > like > > > > Kafka, > > > > > > are > > > > > > > > > also > > > > > > > > > > >> used > > > > > > > > > > >> > to connect one streaming system with another one, > also > > > for > > > > > an > > > > > > > > > > immediate > > > > > > > > > > >> > consumption. > > > > > > > > > > >> > > > > > > > > > > > >> > Sure it can, but imo it is rarely the primary use > case > > > why > > > > > you > > > > > > > > want > > > > > > > > > to > > > > > > > > > > >> > offload the channels to an external persistent > system. > > > > Again > > > > > > in > > > > > > > my > > > > > > > > > > >> > understanding StreamStatus is something transient, > > e.g. > > > > part > > > > > > of > > > > > > > > our > > > > > > > > > > >> > external system went offline. I think those kind of > > > events > > > > > > > should > > > > > > > > > not > > > > > > > > > > be > > > > > > > > > > >> > persisted. > > > > > > > > > > >> > > > > > > > > > > > >> > Both watermarks and idleness status can be some > > > > > > > > > > >> > inherent property of the underlying data stream. if > an > > > > > > > > > > >> upstream/ingesting > > > > > > > > > > >> > system knows that this particular stream/partition > of > > a > > > > > stream > > > > > > > is > > > > > > > > > > going > > > > > > > > > > >> > idle (for example for a couple of hours), why does > > this > > > > > > > > information > > > > > > > > > > >> have to > > > > > > > > > > >> > be re-created in the downstream system using some > > > > heuristic? > > > > > > It > > > > > > > > > could > > > > > > > > > > be > > > > > > > > > > >> > explicitly encoded. > > > > > > > > > > >> > > > > > > > > > > > >> > Because it's most certainly not true in the > > downstream. > > > > The > > > > > > > > idleness > > > > > > > > > > >> works > > > > > > > > > > >> > usually according to a heuristic: "We have not seen > > > > records > > > > > > for > > > > > > > 5 > > > > > > > > > > >> minutes, > > > > > > > > > > >> > so there is a fair chance we won't see records for > the > > > > next > > > > > 5 > > > > > > > > > minutes, > > > > > > > > > > >> so > > > > > > > > > > >> > let's not wait for watermarks for now." That > heuristic > > > > most > > > > > > > > > certainly > > > > > > > > > > >> won't > > > > > > > > > > >> > hold for a downstream persistent storage. > > > > > > > > > > >> > > > > > > > > > > > >> > Imagine you're starting consuming from the result > > > channel > > > > > in a > > > > > > > > > > situation > > > > > > > > > > >> > were you have: > > > > > > > > > > >> > > > > > > > > > > > >> > record4, record3, StreamStatus.ACTIVE, > > StreamStatus.IDLE > > > > > > > record2, > > > > > > > > > > >> record1, > > > > > > > > > > >> > record0 > > > > > > > > > > >> > > > > > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is > > > unnecessary, > > > > > and > > > > > > > > might > > > > > > > > > > >> cause > > > > > > > > > > >> > the record3 and record4 to be late depending on how > > the > > > > > > > watermark > > > > > > > > > > >> > progressed in other partitions. > > > > > > > > > > >> > > > > > > > > > > > >> > I understand Eron's use case, which is not about > > storing > > > > the > > > > > > > > > > >> StreamStatus, > > > > > > > > > > >> > but performing an immediate aggregation or said > > > > differently > > > > > > > > changing > > > > > > > > > > the > > > > > > > > > > >> > partitioning/granularity of records and watermarks > > > > > externally > > > > > > to > > > > > > > > > > Flink. > > > > > > > > > > >> The > > > > > > > > > > >> > produced by Flink partitioning is actually never > > > persisted > > > > > in > > > > > > > that > > > > > > > > > > >> case. In > > > > > > > > > > >> > this case I agree exposing the StreamStatus makes > > > sense. I > > > > > am > > > > > > > > still > > > > > > > > > > >> > concerned it will lead to storing the StreamStatus > > which > > > > can > > > > > > > lead > > > > > > > > to > > > > > > > > > > >> many > > > > > > > > > > >> > subtle problems. > > > > > > > > > > >> > On 04/06/2021 11:53, Piotr Nowojski wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > Hi, > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks for picking up this discussion. For the > > record, I > > > > > also > > > > > > > > think > > > > > > > > > we > > > > > > > > > > >> > shouldn't expose latency markers. > > > > > > > > > > >> > > > > > > > > > > > >> > About the stream status > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Persisting the StreamStatus > > > > > > > > > > >> > > > > > > > > > > > >> > I don't agree with the view that sinks are "storing" > > the > > > > > > > > > data/idleness > > > > > > > > > > >> > status. This nomenclature makes only sense if we are > > > > talking > > > > > > > about > > > > > > > > > > >> > streaming jobs producing batch data. > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > In my understanding a StreamStatus makes sense only > > when > > > > > > talking > > > > > > > > > about > > > > > > > > > > >> > immediately consumed transient channels such as > > between > > > > > > > operators > > > > > > > > > > within > > > > > > > > > > >> > a single job. > > > > > > > > > > >> > > > > > > > > > > > >> > Same as Eron I don't follow this point. Any > streaming > > > sink > > > > > can > > > > > > > be > > > > > > > > > used > > > > > > > > > > >> as > > > > > > > > > > >> > this kind of transient channel. Streaming sinks, > like > > > > Kafka, > > > > > > are > > > > > > > > > also > > > > > > > > > > >> used > > > > > > > > > > >> > to connect one streaming system with another one, > also > > > for > > > > > an > > > > > > > > > > immediate > > > > > > > > > > >> > consumption. > > > > > > > > > > >> > > > > > > > > > > > >> > You could say the same thing about watermarks (note > > they > > > > are > > > > > > > > usually > > > > > > > > > > >> > generated in Flink based on the incoming events) > and I > > > > would > > > > > > not > > > > > > > > > agree > > > > > > > > > > >> with > > > > > > > > > > >> > it in the same way. Both watermarks and idleness > > status > > > > can > > > > > be > > > > > > > > some > > > > > > > > > > >> > inherent property of the underlying data stream. if > an > > > > > > > > > > >> upstream/ingesting > > > > > > > > > > >> > system knows that this particular stream/partition > of > > a > > > > > stream > > > > > > > is > > > > > > > > > > going > > > > > > > > > > >> > idle (for example for a couple of hours), why does > > this > > > > > > > > information > > > > > > > > > > >> have to > > > > > > > > > > >> > be re-created in the downstream system using some > > > > heuristic? > > > > > > It > > > > > > > > > could > > > > > > > > > > be > > > > > > > > > > >> > explicitly encoded. If you want to pass watermarks > > > > > explicitly > > > > > > > to > > > > > > > > a > > > > > > > > > > next > > > > > > > > > > >> > downstream streaming system, because you do not want > > to > > > > > > recreate > > > > > > > > > them > > > > > > > > > > >> from > > > > > > > > > > >> > the events using a duplicated logic, why wouldn't > you > > > like > > > > > to > > > > > > do > > > > > > > > the > > > > > > > > > > >> same > > > > > > > > > > >> > thing with the idleness? > > > > > > > > > > >> > > > > > > > > > > > >> > Also keep in mind that I would expect that a user > can > > > > decide > > > > > > > > whether > > > > > > > > > > he > > > > > > > > > > >> > wants to persist the watermarks/stream status on his > > > own. > > > > > This > > > > > > > > > > >> shouldn't be > > > > > > > > > > >> > obligatory. > > > > > > > > > > >> > > > > > > > > > > > >> > For me there is one good reason to not expose stream > > > > status > > > > > > YET. > > > > > > > > > That > > > > > > > > > > >> is, > > > > > > > > > > >> > if we are sure that we do not need this just yet, > > while > > > at > > > > > the > > > > > > > > same > > > > > > > > > > >> time we > > > > > > > > > > >> > don't want to expand the Public/PublicEvolving API, > as > > > > this > > > > > > > always > > > > > > > > > > >> > increases the maintenance cost. > > > > > > > > > > >> > > > > > > > > > > > >> > Best, > > > > > > > > > > >> > Piotrek > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > pt., 4 cze 2021 o 10:57 Eron Wright < > > > > > ewri...@streamnative.io > > > > > > > > > .invalid> > > > > > > > > > > < > > > > > > > > > > >> ewri...@streamnative.io.invalid> > > > > > > > > > > >> > napisał(a): > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > I believe that the correctness of watermarks and > > stream > > > > > status > > > > > > > > > markers > > > > > > > > > > >> is > > > > > > > > > > >> > determined entirely by the source (ignoring the > > generic > > > > > > > assigner). > > > > > > > > > > Such > > > > > > > > > > >> > stream elements are known not to overtake records, > and > > > > > aren't > > > > > > > > > > transient > > > > > > > > > > >> > from a pipeline perspective. I do agree that > > recoveries > > > > may > > > > > > be > > > > > > > > > lossy > > > > > > > > > > if > > > > > > > > > > >> > some operator state is transient (e.g. valve state). > > > > > > > > > > >> > > > > > > > > > > > >> > Consider that status markers already affect the flow > > of > > > > > > > watermarks > > > > > > > > > > (e.g. > > > > > > > > > > >> > suppression), and thus affect operator behavior. > > Seems > > > to > > > > > me > > > > > > > that > > > > > > > > > > >> exposing > > > > > > > > > > >> > the idleness state is no different than exposing a > > > > > watermark. > > > > > > > > > > >> > > > > > > > > > > > >> > The high-level story is, there is a need for the > Flink > > > job > > > > > to > > > > > > be > > > > > > > > > > >> > transparent or neutral with respect to the event > time > > > > clock. > > > > > > I > > > > > > > > > > believe > > > > > > > > > > >> > this is possible if time flows with high fidelity > from > > > > > source > > > > > > to > > > > > > > > > sink. > > > > > > > > > > >> Of > > > > > > > > > > >> > course, one always has the choice as to whether to > use > > > > > > > > source-based > > > > > > > > > > >> > watermarks; as you mentioned, requirements vary. > > > > > > > > > > >> > > > > > > > > > > > >> > Regarding the Pulsar specifics, we're working on a > > > > community > > > > > > > > > proposal > > > > > > > > > > >> that > > > > > > > > > > >> > I'm anxious to share. To answer your question, the > > > broker > > > > > > > > > aggregates > > > > > > > > > > >> > watermarks from multiple producers who are writing > to > > a > > > > > single > > > > > > > > > topic. > > > > > > > > > > >> > Each sink > > > > > > > > > > >> > subtask is a producer. The broker considers each > > > > producer's > > > > > > > > > > assertions > > > > > > > > > > >> > (watermarks, idleness) to be independent inputs, > much > > > like > > > > > the > > > > > > > > case > > > > > > > > > > with > > > > > > > > > > >> > the watermark valve. > > > > > > > > > > >> > > > > > > > > > > > >> > On your concern about idleness causing false late > > > events, > > > > I > > > > > > > > > understand > > > > > > > > > > >> your > > > > > > > > > > >> > point but don't think it applies if the keyspace > > > > assignments > > > > > > are > > > > > > > > > > stable. > > > > > > > > > > >> > > > > > > > > > > > >> > I hope this explains to your satisfaction. > > > > > > > > > > >> > > > > > > > > > > > >> > - Eron > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz < > > > > > > > > > > dwysakow...@apache.org> > > > > > > > > > > >> <dwysakow...@apache.org> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Eron, > > > > > > > > > > >> > > > > > > > > > > > >> > I might be missing some background on Pulsar > > > partitioning > > > > > but > > > > > > > > > > something > > > > > > > > > > >> > seems off to me. What is the chunk/batch/partition > > that > > > > > Pulsar > > > > > > > > > brokers > > > > > > > > > > >> > will additionally combine watermarks for? Isn't it > the > > > > case > > > > > > that > > > > > > > > > only > > > > > > > > > > a > > > > > > > > > > >> > single Flink sub-task would write to such a chunk > and > > > thus > > > > > > will > > > > > > > > > > produce > > > > > > > > > > >> > an aggregated watermark already via the > writeWatermark > > > > > method? > > > > > > > > > > >> > > > > > > > > > > > >> > Personally I am really skeptical about exposing the > > > > > > StreamStatus > > > > > > > > in > > > > > > > > > > any > > > > > > > > > > >> > Producer API. In my understanding the StreamStatus > is > > a > > > > > > > transient > > > > > > > > > > >> > setting of a consumer of data. StreamStatus is a > > > mechanism > > > > > for > > > > > > > > > making > > > > > > > > > > a > > > > > > > > > > >> > tradeoff between correctness (how many late elements > > > that > > > > > are > > > > > > > > behind > > > > > > > > > > >> > watermark we have) vs making progress. IMO one has > to > > be > > > > > extra > > > > > > > > > > cautious > > > > > > > > > > >> > when it comes to persistent systems. Again I might > be > > > > > missing > > > > > > > the > > > > > > > > > > exact > > > > > > > > > > >> > use case you are trying to solve here, but I can > > imagine > > > > > > > multiple > > > > > > > > > jobs > > > > > > > > > > >> > reading from such a stream which might have > different > > > > > > > correctness > > > > > > > > > > >> > requirements. Just quickly throwing an idea out of > my > > > head > > > > > you > > > > > > > > might > > > > > > > > > > >> > want to have an entirely correct results which can > be > > > > > delayed > > > > > > > for > > > > > > > > > > >> > minutes, and a separate task that produces quick > > > insights > > > > > > within > > > > > > > > > > >> > seconds. Another thing to consider is that by the > time > > > the > > > > > > > > > downstream > > > > > > > > > > >> > job starts consuming the upstream one might have > > > produced > > > > > > > records > > > > > > > > to > > > > > > > > > > the > > > > > > > > > > >> > previously idle chunk. Persisting the StreamStatus > in > > > > such a > > > > > > > > > scenario > > > > > > > > > > >> > would add unnecessary false late events. > > > > > > > > > > >> > > > > > > > > > > > >> > In my understanding a StreamStatus makes sense only > > when > > > > > > talking > > > > > > > > > about > > > > > > > > > > >> > immediately consumed transient channels such as > > between > > > > > > > operators > > > > > > > > > > within > > > > > > > > > > >> > a single job. > > > > > > > > > > >> > > > > > > > > > > > >> > Best, > > > > > > > > > > >> > > > > > > > > > > > >> > Dawid > > > > > > > > > > >> > > > > > > > > > > > >> > On 03/06/2021 23:31, Eron Wright wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > I think the rationale for end-to-end idleness (i.e. > > > > between > > > > > > > > > pipelines) > > > > > > > > > > >> > > > > > > > > > > > >> > is > > > > > > > > > > >> > > > > > > > > > > > >> > the same as the rationale for idleness between > > operators > > > > > > within > > > > > > > a > > > > > > > > > > >> > pipeline. On the 'main issue' you mentioned, we > > > entrust > > > > > the > > > > > > > > source > > > > > > > > > > >> > > > > > > > > > > > >> > with > > > > > > > > > > >> > > > > > > > > > > > >> > adapting to Flink's notion of idleness (e.g. in > Pulsar > > > > > source, > > > > > > > it > > > > > > > > > > means > > > > > > > > > > >> > that no topics/partitions are assigned to a given > > > > > sub-task); a > > > > > > > > > similar > > > > > > > > > > >> > adaption would occur in the sink. In other words, I > > > think > > > > > it > > > > > > > > > > >> > > > > > > > > > > > >> > reasonable > > > > > > > > > > >> > > > > > > > > > > > >> > that a sink for a watermark-aware storage system has > > > need > > > > > for > > > > > > > the > > > > > > > > > > >> > > > > > > > > > > > >> > idleness > > > > > > > > > > >> > > > > > > > > > > > >> > signal. > > > > > > > > > > >> > > > > > > > > > > > >> > Let me explain how I would use it in Pulsar's sink. > > > Each > > > > > > > sub-task > > > > > > > > > is > > > > > > > > > > a > > > > > > > > > > >> > Pulsar producer, and is writing watermarks to a > > > configured > > > > > > topic > > > > > > > > via > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > Producer API. The Pulsar broker aggregates the > > > watermarks > > > > > > that > > > > > > > > are > > > > > > > > > > >> > > > > > > > > > > > >> > written > > > > > > > > > > >> > > > > > > > > > > > >> > by each producer into a global minimum (similar to > > > > > > > > > > >> > > > > > > > > > > > >> > StatusWatermarkValve). > > > > > > > > > > >> > > > > > > > > > > > >> > The broker keeps track of which producers are > actively > > > > > > producing > > > > > > > > > > >> > watermarks, and a producer may mark itself as idle > to > > > tell > > > > > the > > > > > > > > > broker > > > > > > > > > > >> > > > > > > > > > > > >> > not > > > > > > > > > > >> > > > > > > > > > > > >> > to wait for watermarks from it, e.g. when a producer > > is > > > > > going > > > > > > > > > > >> > > > > > > > > > > > >> > offline. I > > > > > > > > > > >> > > > > > > > > > > > >> > had intended to mark the producer as idle when the > > > > sub-task > > > > > is > > > > > > > > > > closing, > > > > > > > > > > >> > > > > > > > > > > > >> > but > > > > > > > > > > >> > > > > > > > > > > > >> > now I see that it would be insufficient; the > producer > > > > should > > > > > > > also > > > > > > > > be > > > > > > > > > > >> > > > > > > > > > > > >> > idled > > > > > > > > > > >> > > > > > > > > > > > >> > if the sub-task is idled. Otherwise, the broker > would > > > > wait > > > > > > > > > > >> > > > > > > > > > > > >> > indefinitely > > > > > > > > > > >> > > > > > > > > > > > >> > for the idled sub-task to produce a watermark. > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid, I think your original instincts were correct > > > about > > > > > > > idleness > > > > > > > > > > >> > propagation, and I hope I've demonstrated a > practical > > > use > > > > > > case. > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise < > > > > > ar...@apache.org > > > > > > > > > > > > > > < > > > > > > > > > > >> ar...@apache.org> wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > When I was rethinking the idleness issue, I came to > > the > > > > > > > conclusion > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > it > > > > > > > > > > >> > > > > > > > > > > > >> > should be inferred at the source of the respective > > > > > downstream > > > > > > > > > pipeline > > > > > > > > > > >> > again. > > > > > > > > > > >> > > > > > > > > > > > >> > The main issue on propagating idleness is that you > > would > > > > > force > > > > > > > the > > > > > > > > > > >> > > > > > > > > > > > >> > same > > > > > > > > > > >> > > > > > > > > > > > >> > definition across all downstream pipelines, which > may > > > not > > > > be > > > > > > > what > > > > > > > > > the > > > > > > > > > > >> > > > > > > > > > > > >> > user > > > > > > > > > > >> > > > > > > > > > > > >> > intended. > > > > > > > > > > >> > On the other hand, I don't immediately see a > technical > > > > > reason > > > > > > > why > > > > > > > > > the > > > > > > > > > > >> > downstream source wouldn't be able to infer that. > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Thu, Jun 3, 2021 at 9:14 PM Eron Wright < > > > > > > > > ewri...@streamnative.io > > > > > > > > > > >> > .invalid> <ewri...@streamnative.io.invalid> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks Piotr for bringing this up. I reflected on > > this > > > > and > > > > > I > > > > > > > > agree > > > > > > > > > > >> > > > > > > > > > > > >> > we > > > > > > > > > > >> > > > > > > > > > > > >> > should expose idleness, otherwise a multi-stage flow > > > could > > > > > > > stall. > > > > > > > > > > >> > > > > > > > > > > > >> > Regarding the latency markers, I don't see an > > immediate > > > > need > > > > > > for > > > > > > > > > > >> > propagating them, because they serve to estimate > > latency > > > > > > within > > > > > > > a > > > > > > > > > > >> > > > > > > > > > > > >> > pipeline, > > > > > > > > > > >> > > > > > > > > > > > >> > not across pipelines. One would probably need to > > > enhance > > > > > the > > > > > > > > source > > > > > > > > > > >> > interface also to do e2e latency. Seems we agree > this > > > > > aspect > > > > > > is > > > > > > > > out > > > > > > > > > > >> > > > > > > > > > > > >> > of > > > > > > > > > > >> > > > > > > > > > > > >> > scope. > > > > > > > > > > >> > > > > > > > > > > > >> > I took a look at the code to get a sense of how to > > > > > accomplish > > > > > > > > this. > > > > > > > > > > >> > > > > > > > > > > > >> > The > > > > > > > > > > >> > > > > > > > > > > > >> > gist is a new `markIdle` method on the > > `StreamOperator` > > > > > > > interface, > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > is > > > > > > > > > > >> > > > > > > > > > > > >> > called when the stream status maintainer (the > > > > > `OperatorChain`) > > > > > > > > > > >> > > > > > > > > > > > >> > transitions > > > > > > > > > > >> > > > > > > > > > > > >> > to idle state. Then, a new `markIdle` method on the > > > > > > > > `SinkFunction` > > > > > > > > > > >> > > > > > > > > > > > >> > and > > > > > > > > > > >> > > > > > > > > > > > >> > `SinkWriter` that is called by the respective > > operators. > > > > > > Note > > > > > > > > that > > > > > > > > > > >> > StreamStatus is an internal class. > > > > > > > > > > >> > > > > > > > > > > > >> > Here's a draft PR (based on the existing PR of > > > > FLINK-22700) > > > > > to > > > > > > > > > > >> > > > > > > > > > > > >> > highlight > > > > > > > > > > >> > > > > > > > > > > > >> > this new aspect: > > > > > > > > https://github.com/streamnative/flink/pull/2/files > > > > > > > > > > >> > > > > > > > > > > > >> > Please let me know if you'd like me to proceed to > > update > > > > the > > > > > > > FLIP > > > > > > > > > > >> > > > > > > > > > > > >> > with > > > > > > > > > > >> > > > > > > > > > > > >> > these details. > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks again, > > > > > > > > > > >> > Eron > > > > > > > > > > >> > > > > > > > > > > > >> > On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski < > > > > > > > > pnowoj...@apache.org > > > > > > > > > > > > > > > > > > > > < > > > > > > > > > > >> pnowoj...@apache.org> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Hi, > > > > > > > > > > >> > > > > > > > > > > > >> > Sorry for chipping in late in the discussion, but I > > > would > > > > > > second > > > > > > > > > > >> > > > > > > > > > > > >> > this > > > > > > > > > > >> > > > > > > > > > > > >> > point > > > > > > > > > > >> > > > > > > > > > > > >> > from Arvid: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker would > > > also > > > > > need > > > > > > > to > > > > > > > > > > >> > > > > > > > > > > > >> > be > > > > > > > > > > >> > > > > > > > > > > > >> > encoded. > > > > > > > > > > >> > > > > > > > > > > > >> > It seems like this point was asked, but not > followed? > > Or > > > > > did I > > > > > > > > miss > > > > > > > > > > >> > > > > > > > > > > > >> > it? > > > > > > > > > > >> > > > > > > > > > > > >> > Especially the StreamStatus part. For me it sounds > > like > > > > > > exposing > > > > > > > > > > >> > > > > > > > > > > > >> > watermarks > > > > > > > > > > >> > > > > > > > > > > > >> > without letting the sink know that the stream can be > > > idle > > > > is > > > > > > an > > > > > > > > > > >> > > > > > > > > > > > >> > incomplete > > > > > > > > > > >> > > > > > > > > > > > >> > feature and can be very problematic/confusing for > > > > potential > > > > > > > users. > > > > > > > > > > >> > > > > > > > > > > > >> > Best, > > > > > > > > > > >> > Piotrek > > > > > > > > > > >> > > > > > > > > > > > >> > pon., 31 maj 2021 o 08:34 Arvid Heise < > > ar...@apache.org > > > > > > > > < > > > > > > > > > > >> ar...@apache.org> > > > > > > > > > > >> > > > > > > > > > > > >> > napisał(a): > > > > > > > > > > >> > > > > > > > > > > > >> > Afaik everyone can start a [VOTE] thread [1]. For > > > example, > > > > > > here > > > > > > > a > > > > > > > > > > >> > non-committer started a successful thread [2]. > > > > > > > > > > >> > If you start it, I can already cast a binding vote > and > > > we > > > > > just > > > > > > > > > > >> > > > > > > > > > > > >> > need 2 > > > > > > > > > > >> > > > > > > > > > > > >> > more > > > > > > > > > > >> > > > > > > > > > > > >> > for the FLIP to be accepted. > > > > > > > > > > >> > > > > > > > > > > > >> > [1] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting > > > > > > > > > > >> > > > > > > > > > > > >> > [2] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html > > > > > > > > > > >> > > > > > > > > > > > >> > On Fri, May 28, 2021 at 8:17 PM Eron Wright < > > > > > > > > > > >> > > > > > > > > > > > >> > ewri...@streamnative.io > > > > > > > > > > >> > > > > > > > > > > > >> > .invalid> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid, > > > > > > > > > > >> > Thanks for the feedback. I investigated the japicmp > > > > > > > > > > >> > > > > > > > > > > > >> > configuration, > > > > > > > > > > >> > > > > > > > > > > > >> > and I > > > > > > > > > > >> > > > > > > > > > > > >> > see that SinkWriter is marked Experimental (not > Public > > > or > > > > > > > > > > >> > > > > > > > > > > > >> > PublicEvolving). > > > > > > > > > > >> > > > > > > > > > > > >> > I think this means that SinkWriter need not be > > excluded. > > > > As > > > > > > you > > > > > > > > > > >> > > > > > > > > > > > >> > mentioned, > > > > > > > > > > >> > > > > > > > > > > > >> > SinkFunction is already excluded. I've updated the > > FLIP > > > > > with > > > > > > an > > > > > > > > > > >> > explanation. > > > > > > > > > > >> > > > > > > > > > > > >> > I believe all issues are resolved. May we proceed > to > > a > > > > vote > > > > > > > now? > > > > > > > > > > >> > > > > > > > > > > > >> > And > > > > > > > > > > >> > > > > > > > > > > > >> > are > > > > > > > > > > >> > > > > > > > > > > > >> > you able to drive the vote process? > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks, > > > > > > > > > > >> > Eron > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Fri, May 28, 2021 at 4:40 AM Arvid Heise < > > > > > ar...@apache.org > > > > > > > > > > > > > > < > > > > > > > > > > >> ar...@apache.org> > > > > > > > > > > >> > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Eron, > > > > > > > > > > >> > > > > > > > > > > > >> > 1. fair point. It still feels odd to have > > writeWatermark > > > > in > > > > > > the > > > > > > > > > > >> > SinkFunction (it's supposed to be functional as you > > > > > > mentioned), > > > > > > > > > > >> > > > > > > > > > > > >> > but I > > > > > > > > > > >> > > > > > > > > > > > >> > agree > > > > > > > > > > >> > > > > > > > > > > > >> > that invokeWatermark is not better. So unless > someone > > > has > > > > a > > > > > > > > > > >> > > > > > > > > > > > >> > better > > > > > > > > > > >> > > > > > > > > > > > >> > idea, > > > > > > > > > > >> > > > > > > > > > > > >> > I'm fine with it. > > > > > > > > > > >> > 2.+3. I tried to come up with scenarios for a longer > > > time. > > > > > In > > > > > > > > > > >> > > > > > > > > > > > >> > general, > > > > > > > > > > >> > > > > > > > > > > > >> > it > > > > > > > > > > >> > > > > > > > > > > > >> > seems as if the new SinkWriter interface encourages > > more > > > > > > > > > > >> > > > > > > > > > > > >> > injection > > > > > > > > > > >> > > > > > > > > > > > >> > (see > > > > > > > > > > >> > > > > > > > > > > > >> > processing time service in InitContext), such that > the > > > > need > > > > > > for > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > context > > > > > > > > > > >> > > > > > > > > > > > >> > is really just context information of that > particular > > > > record > > > > > > and > > > > > > > > > > >> > > > > > > > > > > > >> > I > > > > > > > > > > >> > > > > > > > > > > > >> > don't > > > > > > > > > > >> > > > > > > > > > > > >> > see any use beyond timestamp and watermark. For > > > > > SinkFunction, > > > > > > > I'd > > > > > > > > > > >> > > > > > > > > > > > >> > not > > > > > > > > > > >> > > > > > > > > > > > >> > over-engineer as it's going to be deprecated > soonish. > > So > > > > +1 > > > > > to > > > > > > > > > > >> > > > > > > > > > > > >> > leave > > > > > > > > > > >> > > > > > > > > > > > >> > it > > > > > > > > > > >> > > > > > > > > > > > >> > out. > > > > > > > > > > >> > 4. Okay so I double-checked: from an execution > > > > perspective, > > > > > it > > > > > > > > > > >> > > > > > > > > > > > >> > works. > > > > > > > > > > >> > > > > > > > > > > > >> > However, japicmp would definitely complain. I > propose > > to > > > > add > > > > > > it > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > compatibility section like this. We need to add an > > > > exception > > > > > > to > > > > > > > > > > >> > > > > > > > > > > > >> > SinkWriter > > > > > > > > > > >> > > > > > > > > > > > >> > then. (SinkFunction is already on the exception > list) > > > > > > > > > > >> > 5.+6. Awesome, I was also sure but wanted to double > > > check. > > > > > > > > > > >> > > > > > > > > > > > >> > Best, > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Wed, May 26, 2021 at 7:29 PM Eron Wright < > > > > > > > > > > >> > > > > > > > > > > > >> > ewri...@streamnative.io > > > > > > > > > > >> > > > > > > > > > > > >> > .invalid> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid, > > > > > > > > > > >> > > > > > > > > > > > >> > 1. I assume that the method name `invoke` stems from > > > > > > > > > > >> > > > > > > > > > > > >> > considering > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > SinkFunction to be a functional interface, but is > > > > otherwise > > > > > > > > > > >> > > > > > > > > > > > >> > meaningless. > > > > > > > > > > >> > > > > > > > > > > > >> > Keeping it as `writeWatermark` does keep it > symmetric > > > with > > > > > > > > > > >> > > > > > > > > > > > >> > SinkWriter. > > > > > > > > > > >> > > > > > > > > > > > >> > My > > > > > > > > > > >> > > > > > > > > > > > >> > vote is to leave it. You decide. > > > > > > > > > > >> > > > > > > > > > > > >> > 2+3. I too considered adding a `WatermarkContext`, > but > > > it > > > > > > would > > > > > > > > > > >> > > > > > > > > > > > >> > merely > > > > > > > > > > >> > > > > > > > > > > > >> > be a > > > > > > > > > > >> > > > > > > > > > > > >> > placeholder. I don't anticipate any context info in > > > > future. > > > > > > > > > > >> > > > > > > > > > > > >> > As > > > > > > > > > > >> > > > > > > > > > > > >> > we > > > > > > > > > > >> > > > > > > > > > > > >> > see > > > > > > > > > > >> > > > > > > > > > > > >> > with invoke, it is possible to add a context later > in > > a > > > > > > > > > > >> > backwards-compatible way. My vote is to not > > introduce a > > > > > > > > > > >> > > > > > > > > > > > >> > context. > > > > > > > > > > >> > > > > > > > > > > > >> > You > > > > > > > > > > >> > > > > > > > > > > > >> > decide. > > > > > > > > > > >> > > > > > > > > > > > >> > 4. No anticipated compatibility issues. > > > > > > > > > > >> > > > > > > > > > > > >> > 5. Short answer, it works as expected. The new > > methods > > > > are > > > > > > > > > > >> > > > > > > > > > > > >> > invoked > > > > > > > > > > >> > > > > > > > > > > > >> > whenever the underlying operator receives a > watermark. > > > I > > > > do > > > > > > > > > > >> > > > > > > > > > > > >> > believe > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > batch and ingestion time applications receive > > > watermarks. > > > > > > Seems > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > programming model is more unified in that respect > > since > > > > 1.12 > > > > > > > > > > >> > > > > > > > > > > > >> > (FLIP-134). > > > > > > > > > > >> > > > > > > > > > > > >> > 6. The failure behavior is the same as for elements. > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks, > > > > > > > > > > >> > Eron > > > > > > > > > > >> > > > > > > > > > > > >> > On Tue, May 25, 2021 at 12:42 PM Arvid Heise < > > > > > > ar...@apache.org > > > > > > > > > > >> > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Eron, > > > > > > > > > > >> > > > > > > > > > > > >> > I think the FLIP is crisp and mostly good to go. > Some > > > > > smaller > > > > > > > > > > >> > things/questions: > > > > > > > > > > >> > > > > > > > > > > > >> > 1. SinkFunction#writeWatermark could be named > > > > > > > > > > >> > SinkFunction#invokeWatermark or invokeOnWatermark > > to > > > > keep > > > > > > > > > > >> > > > > > > > > > > > >> > it > > > > > > > > > > >> > > > > > > > > > > > >> > symmetric. > > > > > > > > > > >> > > > > > > > > > > > >> > 2. We could add the context parameter to both. > For > > > > > > > > > > >> > > > > > > > > > > > >> > SinkWriter#Context, > > > > > > > > > > >> > > > > > > > > > > > >> > we currently do not gain much. > SinkFunction#Context > > > > also > > > > > > > > > > >> > > > > > > > > > > > >> > exposes > > > > > > > > > > >> > > > > > > > > > > > >> > processing > > > > > > > > > > >> > time, which may or may not be handy and is > > currently > > > > > > > > > > >> > > > > > > > > > > > >> > mostly > > > > > > > > > > >> > > > > > > > > > > > >> > used > > > > > > > > > > >> > > > > > > > > > > > >> > for > > > > > > > > > > >> > > > > > > > > > > > >> > StreamingFileSink bucket policies. We may add > that > > > > > > > > > > >> > > > > > > > > > > > >> > processing > > > > > > > > > > >> > > > > > > > > > > > >> > time > > > > > > > > > > >> > > > > > > > > > > > >> > flag > > > > > > > > > > >> > > > > > > > > > > > >> > also to SinkWriter#Context in the future. > > > > > > > > > > >> > 3. Alternatively, we could also add a different > > > context > > > > > > > > > > >> > > > > > > > > > > > >> > parameter > > > > > > > > > > >> > > > > > > > > > > > >> > just > > > > > > > > > > >> > > > > > > > > > > > >> > to keep the API stable while allowing additional > > > > > > > > > > >> > > > > > > > > > > > >> > information > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > be > > > > > > > > > > >> > > > > > > > > > > > >> > passed > > > > > > > > > > >> > in the future. > > > > > > > > > > >> > 4. Would we run into any compatibility issue if > we > > > use > > > > > > > > > > >> > > > > > > > > > > > >> > Flink > > > > > > > > > > >> > > > > > > > > > > > >> > 1.13 > > > > > > > > > > >> > > > > > > > > > > > >> > source > > > > > > > > > > >> > > > > > > > > > > > >> > in Flink 1.14 (with this FLIP) or vice versa? > > > > > > > > > > >> > 5. What happens with sinks that use the new > methods > > > in > > > > > > > > > > >> > > > > > > > > > > > >> > applications > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > do not have watermarks (batch mode, processing > > time)? > > > > > Does > > > > > > > > > > >> > > > > > > > > > > > >> > this > > > > > > > > > > >> > > > > > > > > > > > >> > also > > > > > > > > > > >> > > > > > > > > > > > >> > work > > > > > > > > > > >> > with ingestion time sufficiently? > > > > > > > > > > >> > 6. How do exactly once sinks deal with written > > > > watermarks > > > > > > > > > > >> > > > > > > > > > > > >> > in > > > > > > > > > > >> > > > > > > > > > > > >> > case > > > > > > > > > > >> > > > > > > > > > > > >> > of > > > > > > > > > > >> > > > > > > > > > > > >> > failure? I guess it's the same as normal records. > > > > (Either > > > > > > > > > > >> > > > > > > > > > > > >> > rollback > > > > > > > > > > >> > > > > > > > > > > > >> > of > > > > > > > > > > >> > > > > > > > > > > > >> > transaction or deduplication on resumption) > > > > > > > > > > >> > > > > > > > > > > > >> > Best, > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid > > > > > > > > > > >> > > > > > > > > > > > >> > On Tue, May 25, 2021 at 6:44 PM Eron Wright < > > > > > > > > > > >> > > > > > > > > > > > >> > ewri...@streamnative.io > > > > > > > > > > >> > > > > > > > > > > > >> > .invalid> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Does anyone have further comment on FLIP-167? > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks, > > > > > > > > > > >> > Eron > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Thu, May 20, 2021 at 5:02 PM Eron Wright < > > > > > > > > > > >> > > > > > > > > > > > >> > ewri...@streamnative.io > > > > > > > > > > >> > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Filed FLIP-167: Watermarks for Sink API: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API > > > > > > > > > > >> > > > > > > > > > > > >> > I'd like to call a vote next week, is that > reasonable? > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian < > > > > > > > > > > >> > > > > > > > > > > > >> > b.z...@dell.com > > > > > > > > > > >> > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Arvid and Eron, > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks for the discussion and I read through Eron's > > pull > > > > > > > > > > >> > > > > > > > > > > > >> > request > > > > > > > > > > >> > > > > > > > > > > > >> > and I > > > > > > > > > > >> > > > > > > > > > > > >> > think this can benefit Pravega Flink connector as > > well. > > > > > > > > > > >> > > > > > > > > > > > >> > Here is some background. Pravega had the watermark > > > > > > > > > > >> > > > > > > > > > > > >> > concept > > > > > > > > > > >> > > > > > > > > > > > >> > through > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > event stream since two years ago, and here is a blog > > > > > > > > > > >> > > > > > > > > > > > >> > introduction[1] > > > > > > > > > > >> > > > > > > > > > > > >> > for > > > > > > > > > > >> > > > > > > > > > > > >> > Pravega watermark. > > > > > > > > > > >> > Pravega Flink connector also had this watermark > > > > > > > > > > >> > > > > > > > > > > > >> > integration > > > > > > > > > > >> > > > > > > > > > > > >> > last > > > > > > > > > > >> > > > > > > > > > > > >> > year > > > > > > > > > > >> > > > > > > > > > > > >> > that we wanted to propagate the Flink watermark to > > > > > > > > > > >> > > > > > > > > > > > >> > Pravega > > > > > > > > > > >> > > > > > > > > > > > >> > in > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > SinkFunction, and at that time we just used the > > existing > > > > > > > > > > >> > > > > > > > > > > > >> > Flink > > > > > > > > > > >> > > > > > > > > > > > >> > API > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > we > > > > > > > > > > >> > > > > > > > > > > > >> > keep the last watermark in memory and check if > > watermark > > > > > > > > > > >> > > > > > > > > > > > >> > changes > > > > > > > > > > >> > > > > > > > > > > > >> > for > > > > > > > > > > >> > > > > > > > > > > > >> > each > > > > > > > > > > >> > > > > > > > > > > > >> > event[2] which is not efficient. With such new > > > > > > > > > > >> > > > > > > > > > > > >> > interface, > > > > > > > > > > >> > > > > > > > > > > > >> > we > > > > > > > > > > >> > > > > > > > > > > > >> > can > > > > > > > > > > >> > > > > > > > > > > > >> > also > > > > > > > > > > >> > > > > > > > > > > > >> > manage the watermark propagation much more easily. > > > > > > > > > > >> > > > > > > > > > > > >> > [1] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > https://pravega.io/blog/2019/11/08/pravega-watermarking-support/ > > > > > > > > > > >> > > > > > > > > > > > >> > [2] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465 > > > > > > > > > > >> > > > > > > > > > > > >> > -----Original Message----- > > > > > > > > > > >> > From: Arvid Heise <ar...@apache.org> < > > ar...@apache.org> > > > > > > > > > > >> > Sent: Wednesday, May 19, 2021 16:06 > > > > > > > > > > >> > To: dev > > > > > > > > > > >> > Subject: Re: [DISCUSS] Watermark propagation with > Sink > > > > > > > > > > >> > > > > > > > > > > > >> > API > > > > > > > > > > >> > > > > > > > > > > > >> > [EXTERNAL EMAIL] > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Eron, > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks for pushing that topic. I can now see that > the > > > > > > > > > > >> > > > > > > > > > > > >> > benefit > > > > > > > > > > >> > > > > > > > > > > > >> > is > > > > > > > > > > >> > > > > > > > > > > > >> > even > > > > > > > > > > >> > > > > > > > > > > > >> > bigger than I initially thought. So it's worthwhile > > > > > > > > > > >> > > > > > > > > > > > >> > anyways > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > include > > > > > > > > > > >> > > > > > > > > > > > >> > that. > > > > > > > > > > >> > > > > > > > > > > > >> > I also briefly thought about exposing watermarks to > > all > > > > > > > > > > >> > > > > > > > > > > > >> > UDFs, > > > > > > > > > > >> > > > > > > > > > > > >> > but > > > > > > > > > > >> > > > > > > > > > > > >> > here I > > > > > > > > > > >> > > > > > > > > > > > >> > really have an issue to see specific use cases. > Could > > > > > > > > > > >> > > > > > > > > > > > >> > you > > > > > > > > > > >> > > > > > > > > > > > >> > maybe > > > > > > > > > > >> > > > > > > > > > > > >> > take a > > > > > > > > > > >> > > > > > > > > > > > >> > few > > > > > > > > > > >> > > > > > > > > > > > >> > minutes to think about it as well? I could only see > > > > > > > > > > >> > > > > > > > > > > > >> > someone > > > > > > > > > > >> > > > > > > > > > > > >> > misusing > > > > > > > > > > >> > > > > > > > > > > > >> > Async > > > > > > > > > > >> > > > > > > > > > > > >> > IO as a sink where a real sink would be more > > > > > > > > > > >> > > > > > > > > > > > >> > appropriate. > > > > > > > > > > >> > > > > > > > > > > > >> > In > > > > > > > > > > >> > > > > > > > > > > > >> > general, > > > > > > > > > > >> > > > > > > > > > > > >> > if > > > > > > > > > > >> > > > > > > > > > > > >> > there is not a clear use case, we shouldn't add the > > > > > > > > > > >> > > > > > > > > > > > >> > functionality > > > > > > > > > > >> > > > > > > > > > > > >> > as > > > > > > > > > > >> > > > > > > > > > > > >> > it's > > > > > > > > > > >> > > > > > > > > > > > >> > just increased maintenance for no value. > > > > > > > > > > >> > > > > > > > > > > > >> > If we stick to the plan, I think your PR is already > > in a > > > > > > > > > > >> > > > > > > > > > > > >> > good > > > > > > > > > > >> > > > > > > > > > > > >> > shape. > > > > > > > > > > >> > > > > > > > > > > > >> > We > > > > > > > > > > >> > > > > > > > > > > > >> > need to create a FLIP for it though, since it > changes > > > > > > > > > > >> > > > > > > > > > > > >> > Public > > > > > > > > > > >> > > > > > > > > > > > >> > interfaces > > > > > > > > > > >> > > > > > > > > > > > >> > [1]. I was initially not convinced that we should > also > > > > > > > > > > >> > > > > > > > > > > > >> > change > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > old > > > > > > > > > > >> > > > > > > > > > > > >> > SinkFunction interface, but seeing how little the > > change > > > > > > > > > > >> > > > > > > > > > > > >> > is, I > > > > > > > > > > >> > > > > > > > > > > > >> > wouldn't > > > > > > > > > > >> > > > > > > > > > > > >> > mind at all to increase consistency. Only when we > > wrote > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > FLIP > > > > > > > > > > >> > > > > > > > > > > > >> > and > > > > > > > > > > >> > > > > > > > > > > > >> > approved it (which should be minimal and fast), we > > > > > > > > > > >> > > > > > > > > > > > >> > should > > > > > > > > > > >> > > > > > > > > > > > >> > actually > > > > > > > > > > >> > > > > > > > > > > > >> > look > > > > > > > > > > >> > > > > > > > > > > > >> > at > > > > > > > > > > >> > > > > > > > > > > > >> > the PR ;). > > > > > > > > > > >> > > > > > > > > > > > >> > The only thing which I would improve is the name of > > the > > > > > > > > > > >> > > > > > > > > > > > >> > function. > > > > > > > > > > >> > > > > > > > > > > > >> > processWatermark sounds as if the sink implementer > > > > > > > > > > >> > > > > > > > > > > > >> > really > > > > > > > > > > >> > > > > > > > > > > > >> > needs > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > implement it (as you would need to do it on a custom > > > > > > > > > > >> > > > > > > > > > > > >> > operator). > > > > > > > > > > >> > > > > > > > > > > > >> > I > > > > > > > > > > >> > > > > > > > > > > > >> > would > > > > > > > > > > >> > > > > > > > > > > > >> > make them symmetric to the record writing/invoking > > > > > > > > > > >> > > > > > > > > > > > >> > method > > > > > > > > > > >> > > > > > > > > > > > >> > (e.g. > > > > > > > > > > >> > > > > > > > > > > > >> > writeWatermark and invokeWatermark). > > > > > > > > > > >> > > > > > > > > > > > >> > As a follow-up PR, we should then migrate > KafkaShuffle > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > new > > > > > > > > > > >> > > > > > > > > > > > >> > API. > > > > > > > > > > >> > > > > > > > > > > > >> > But that's something I can do. > > > > > > > > > > >> > > > > > > > > > > > >> > [1] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$ > > > > > > > > > > >> > > > > > > > > > > > >> > [cwiki[.]apache[.]org] > > > > > > > > > > >> > > > > > > > > > > > >> > On Wed, May 19, 2021 at 3:34 AM Eron Wright < > > > > > > > > > > >> > > > > > > > > > > > >> > ewri...@streamnative.io > > > > > > > > > > >> > > > > > > > > > > > >> > .invalid> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Update: opened an issue and a PR. > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM > > > > > > > > > > >> > > > > > > > > > > > >> > plbgRO4$ [issues[.]apache[.]org] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950 > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a > > > > > > > > > > >> > > > > > > > > > > > >> > $ [github[.]com] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright < > > > > > > > > > > >> > > > > > > > > > > > >> > ewri...@streamnative.io > > > > > > > > > > >> > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks Arvid and David for sharing your ideas on > > > > > > > > > > >> > > > > > > > > > > > >> > this > > > > > > > > > > >> > > > > > > > > > > > >> > subject. > > > > > > > > > > >> > > > > > > > > > > > >> > I'm > > > > > > > > > > >> > > > > > > > > > > > >> > glad to hear that you're seeing use cases for > > > > > > > > > > >> > > > > > > > > > > > >> > watermark > > > > > > > > > > >> > > > > > > > > > > > >> > propagation > > > > > > > > > > >> > > > > > > > > > > > >> > via an enhanced sink interface. > > > > > > > > > > >> > > > > > > > > > > > >> > As you've guessed, my interest is in Pulsar and am > > > > > > > > > > >> > > > > > > > > > > > >> > exploring > > > > > > > > > > >> > > > > > > > > > > > >> > some > > > > > > > > > > >> > > > > > > > > > > > >> > options for brokering watermarks across stream > > > > > > > > > > >> > > > > > > > > > > > >> > processing > > > > > > > > > > >> > > > > > > > > > > > >> > pipelines. > > > > > > > > > > >> > > > > > > > > > > > >> > I think > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid > > > > > > > > > > >> > > > > > > > > > > > >> > is speaking to a high-fidelity solution where the > > > > > > > > > > >> > > > > > > > > > > > >> > difference > > > > > > > > > > >> > > > > > > > > > > > >> > between > > > > > > > > > > >> > > > > > > > > > > > >> > intra- > > > > > > > > > > >> > > > > > > > > > > > >> > and inter-pipeline flow is eliminated. My goal is > > > > > > > > > > >> > > > > > > > > > > > >> > more > > > > > > > > > > >> > > > > > > > > > > > >> > limited; I > > > > > > > > > > >> > > > > > > > > > > > >> > want > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > write the watermark that arrives at the sink to > > > > > > > > > > >> > > > > > > > > > > > >> > Pulsar. > > > > > > > > > > >> > > > > > > > > > > > >> > Simply > > > > > > > > > > >> > > > > > > > > > > > >> > imagine that Pulsar has native support for > > > > > > > > > > >> > > > > > > > > > > > >> > watermarking > > > > > > > > > > >> > > > > > > > > > > > >> > in > > > > > > > > > > >> > > > > > > > > > > > >> > its > > > > > > > > > > >> > > > > > > > > > > > >> > producer/consumer API, and we'll leave the details > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > another > > > > > > > > > > >> > > > > > > > > > > > >> > forum. > > > > > > > > > > >> > > > > > > > > > > > >> > David, I like your invariant. I see lateness as > > > > > > > > > > >> > > > > > > > > > > > >> > stemming > > > > > > > > > > >> > > > > > > > > > > > >> > from > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > problem > > > > > > > > > > >> > > > > > > > > > > > >> > domain and from system dynamics (e.g. scheduling, > > > > > > > > > > >> > > > > > > > > > > > >> > batching, > > > > > > > > > > >> > > > > > > > > > > > >> > lag). > > > > > > > > > > >> > > > > > > > > > > > >> > When > > > > > > > > > > >> > > > > > > > > > > > >> > one > > > > > > > > > > >> > > > > > > > > > > > >> > depends on order-of-observation to generate > > > > > > > > > > >> > > > > > > > > > > > >> > watermarks, > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > app > > > > > > > > > > >> > > > > > > > > > > > >> > may > > > > > > > > > > >> > > > > > > > > > > > >> > become > > > > > > > > > > >> > > > > > > > > > > > >> > unduly sensitive to dynamics which bear on > > > > > > > > > > >> > > > > > > > > > > > >> > order-of-observation. > > > > > > > > > > >> > > > > > > > > > > > >> > My > > > > > > > > > > >> > > > > > > > > > > > >> > goal is to factor out the system dynamics from > > > > > > > > > > >> > > > > > > > > > > > >> > lateness > > > > > > > > > > >> > > > > > > > > > > > >> > determination. > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid, to be most valuable (at least for my > > > > > > > > > > >> > > > > > > > > > > > >> > purposes) > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > enhancement is needed on SinkFunction. This will > > > > > > > > > > >> > > > > > > > > > > > >> > allow > > > > > > > > > > >> > > > > > > > > > > > >> > us > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > easily > > > > > > > > > > >> > > > > > > > > > > > >> > evolve the existing Pulsar connector. > > > > > > > > > > >> > > > > > > > > > > > >> > Next step, I will open a PR to advance the > > > > > > > > > > >> > > > > > > > > > > > >> > conversation. > > > > > > > > > > >> > > > > > > > > > > > >> > Eron > > > > > > > > > > >> > > > > > > > > > > > >> > On Tue, May 18, 2021 at 5:06 AM David Morávek< > > > > > > > > > david.mora...@gmail.com > > > > > > > > > > > > > > > > > > > > > >> <david.mora...@gmail.com> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Eron, > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks for starting this discussion. I've been > > > > > > > > > > >> > > > > > > > > > > > >> > thinking > > > > > > > > > > >> > > > > > > > > > > > >> > about > > > > > > > > > > >> > > > > > > > > > > > >> > this > > > > > > > > > > >> > > > > > > > > > > > >> > recently as we've run into "watermark related" > > > > > > > > > > >> > > > > > > > > > > > >> > issues, > > > > > > > > > > >> > > > > > > > > > > > >> > when > > > > > > > > > > >> > > > > > > > > > > > >> > chaining multiple pipelines together. My to cents > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > discussion: > > > > > > > > > > >> > > > > > > > > > > > >> > How I like to think about the problem, is that > > > > > > > > > > >> > > > > > > > > > > > >> > there > > > > > > > > > > >> > > > > > > > > > > > >> > should > > > > > > > > > > >> > > > > > > > > > > > >> > an > > > > > > > > > > >> > > > > > > > > > > > >> > invariant that holds for any stream processing > > > > > > > > > > >> > > > > > > > > > > > >> > pipeline: > > > > > > > > > > >> > > > > > > > > > > > >> > "NON_LATE > > > > > > > > > > >> > > > > > > > > > > > >> > element > > > > > > > > > > >> > > > > > > > > > > > >> > entering > > > > > > > > > > >> > > > > > > > > > > > >> > the system, should never become LATE" > > > > > > > > > > >> > > > > > > > > > > > >> > Unfortunately this is exactly what happens in > > > > > > > > > > >> > > > > > > > > > > > >> > downstream > > > > > > > > > > >> > > > > > > > > > > > >> > pipelines, > > > > > > > > > > >> > > > > > > > > > > > >> > because the upstream one can: > > > > > > > > > > >> > - break ordering (especially with higher > > > > > > > > > > >> > > > > > > > > > > > >> > parallelism) > > > > > > > > > > >> > > > > > > > > > > > >> > - emit elements that are ahead of output watermark > > > > > > > > > > >> > > > > > > > > > > > >> > There is not enough information to re-construct > > > > > > > > > > >> > > > > > > > > > > > >> > upstream > > > > > > > > > > >> > > > > > > > > > > > >> > watermark > > > > > > > > > > >> > > > > > > > > > > > >> > in latter stages (it's always just an estimate > > > > > > > > > > >> > > > > > > > > > > > >> > based > > > > > > > > > > >> > > > > > > > > > > > >> > on > > > > > > > > > > >> > > > > > > > > > > > >> > previous > > > > > > > > > > >> > > > > > > > > > > > >> > pipeline's output). > > > > > > > > > > >> > > > > > > > > > > > >> > It would be great, if we could have a general > > > > > > > > > > >> > > > > > > > > > > > >> > abstraction, > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > is > > > > > > > > > > >> > > > > > > > > > > > >> > reusable for various sources / sinks (not just > > > > > > > > > > >> > > > > > > > > > > > >> > Kafka > > > > > > > > > > >> > > > > > > > > > > > >> > / > > > > > > > > > > >> > > > > > > > > > > > >> > Pulsar, > > > > > > > > > > >> > > > > > > > > > > > >> > thought this would probably cover most of the > > > > > > > > > > >> > > > > > > > > > > > >> > use-cases) > > > > > > > > > > >> > > > > > > > > > > > >> > and > > > > > > > > > > >> > > > > > > > > > > > >> > systems. > > > > > > > > > > >> > > > > > > > > > > > >> > Is there any other use-case then sharing watermark > > > > > > > > > > >> > > > > > > > > > > > >> > between > > > > > > > > > > >> > > > > > > > > > > > >> > pipelines, > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > you're trying to solve? > > > > > > > > > > >> > > > > > > > > > > > >> > Arvid: > > > > > > > > > > >> > > > > > > > > > > > >> > 1. Watermarks are closely coupled to the used > > > > > > > > > > >> > > > > > > > > > > > >> > system > > > > > > > > > > >> > > > > > > > > > > > >> > (=Flink). > > > > > > > > > > >> > > > > > > > > > > > >> > I > > > > > > > > > > >> > > > > > > > > > > > >> > have a > > > > > > > > > > >> > > > > > > > > > > > >> > hard time imagining that it's useful to use a > > > > > > > > > > >> > > > > > > > > > > > >> > different > > > > > > > > > > >> > > > > > > > > > > > >> > stream > > > > > > > > > > >> > > > > > > > > > > > >> > processor > > > > > > > > > > >> > > > > > > > > > > > >> > downstream. So for now, I'm assuming that both > > > > > > > > > > >> > > > > > > > > > > > >> > upstream > > > > > > > > > > >> > > > > > > > > > > > >> > and > > > > > > > > > > >> > > > > > > > > > > > >> > downstream > > > > > > > > > > >> > > > > > > > > > > > >> > are > > > > > > > > > > >> > > > > > > > > > > > >> > Flink applications. In that case, we probably > > > > > > > > > > >> > > > > > > > > > > > >> > define > > > > > > > > > > >> > > > > > > > > > > > >> > both > > > > > > > > > > >> > > > > > > > > > > > >> > parts > > > > > > > > > > >> > > > > > > > > > > > >> > of the pipeline in the same Flink job similar to > > > > > > > > > > >> > > > > > > > > > > > >> > KafkaStream's > > > > > > > > > > >> > > > > > > > > > > > >> > #through. > > > > > > > > > > >> > > > > > > > > > > > >> > I'd slightly disagree here. For example we're > > > > > > > > > > >> > > > > > > > > > > > >> > "materializing" > > > > > > > > > > >> > > > > > > > > > > > >> > change-logs > > > > > > > > > > >> > > > > > > > > > > > >> > produced by Flink pipeline into serving layer > > > > > > > > > > >> > > > > > > > > > > > >> > (random > > > > > > > > > > >> > > > > > > > > > > > >> > access > > > > > > > > > > >> > > > > > > > > > > > >> > db / > > > > > > > > > > >> > > > > > > > > > > > >> > in memory view / ..) and we need to know, whether > > > > > > > > > > >> > > > > > > > > > > > >> > responses > > > > > > > > > > >> > > > > > > > > > > > >> > we > > > > > > > > > > >> > > > > > > > > > > > >> > serve meet the "freshness" requirements (eg. you > > > > > > > > > > >> > > > > > > > > > > > >> > may > > > > > > > > > > >> > > > > > > > > > > > >> > want > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > respond differently, when watermark is lagging way > > > > > > > > > > >> > > > > > > > > > > > >> > too > > > > > > > > > > >> > > > > > > > > > > > >> > much > > > > > > > > > > >> > > > > > > > > > > > >> > behind > > > > > > > > > > >> > > > > > > > > > > > >> > processing time). Also not > > > > > > > > > > >> > > > > > > > > > > > >> > every > > > > > > > > > > >> > > > > > > > > > > > >> > stream processor in the pipeline needs to be Flink. > > > > > > > > > > >> > > > > > > > > > > > >> > It > > > > > > > > > > >> > > > > > > > > > > > >> > can > > > > > > > > > > >> > > > > > > > > > > > >> > as > > > > > > > > > > >> > > > > > > > > > > > >> > well > > > > > > > > > > >> > > > > > > > > > > > >> > be a simple element-wise transformation that reads > > > > > > > > > > >> > > > > > > > > > > > >> > from > > > > > > > > > > >> > > > > > > > > > > > >> > Kafka > > > > > > > > > > >> > > > > > > > > > > > >> > and > > > > > > > > > > >> > > > > > > > > > > > >> > writes back into separate topic (that's what we do > > > > > > > > > > >> > > > > > > > > > > > >> > for > > > > > > > > > > >> > > > > > > > > > > > >> > example > > > > > > > > > > >> > > > > > > > > > > > >> > with > > > > > > > > > > >> > > > > > > > > > > > >> > ML models, that have special hardware > > > > > > > > > > >> > > > > > > > > > > > >> > requirements). > > > > > > > > > > >> > > > > > > > > > > > >> > Best, > > > > > > > > > > >> > D. > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Tue, May 18, 2021 at 8:30 AM Arvid Heise < > > > > > > > > > > >> > > > > > > > > > > > >> > ar...@apache.org> > > > > > > > > > > >> > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Eron, > > > > > > > > > > >> > > > > > > > > > > > >> > I think this is a useful addition for storage > > > > > > > > > > >> > > > > > > > > > > > >> > systems > > > > > > > > > > >> > > > > > > > > > > > >> > that > > > > > > > > > > >> > > > > > > > > > > > >> > act > > > > > > > > > > >> > > > > > > > > > > > >> > as > > > > > > > > > > >> > > > > > > > > > > > >> > pass-through for Flink to reduce recovery time. > > > > > > > > > > >> > > > > > > > > > > > >> > It > > > > > > > > > > >> > > > > > > > > > > > >> > is > > > > > > > > > > >> > > > > > > > > > > > >> > only > > > > > > > > > > >> > > > > > > > > > > > >> > useful > > > > > > > > > > >> > > > > > > > > > > > >> > if > > > > > > > > > > >> > > > > > > > > > > > >> > you > > > > > > > > > > >> > > > > > > > > > > > >> > combine it with regional fail-over as only a > > > > > > > > > > >> > > > > > > > > > > > >> > small > > > > > > > > > > >> > > > > > > > > > > > >> > part > > > > > > > > > > >> > > > > > > > > > > > >> > of > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > pipeline > > > > > > > > > > >> > > > > > > > > > > > >> > is > > > > > > > > > > >> > > > > > > > > > > > >> > restarted. > > > > > > > > > > >> > > > > > > > > > > > >> > A couple of thoughts on the implications: > > > > > > > > > > >> > 1. Watermarks are closely coupled to the used > > > > > > > > > > >> > > > > > > > > > > > >> > system > > > > > > > > > > >> > > > > > > > > > > > >> > (=Flink). > > > > > > > > > > >> > > > > > > > > > > > >> > I > > > > > > > > > > >> > > > > > > > > > > > >> > have > > > > > > > > > > >> > > > > > > > > > > > >> > a > > > > > > > > > > >> > > > > > > > > > > > >> > hard time imagining that it's useful to use a > > > > > > > > > > >> > > > > > > > > > > > >> > different > > > > > > > > > > >> > > > > > > > > > > > >> > stream > > > > > > > > > > >> > > > > > > > > > > > >> > processor > > > > > > > > > > >> > > > > > > > > > > > >> > downstream. So for now, I'm assuming that both > > > > > > > > > > >> > > > > > > > > > > > >> > upstream > > > > > > > > > > >> > > > > > > > > > > > >> > and > > > > > > > > > > >> > > > > > > > > > > > >> > downstream > > > > > > > > > > >> > > > > > > > > > > > >> > are > > > > > > > > > > >> > > > > > > > > > > > >> > Flink applications. In that case, we probably > > > > > > > > > > >> > > > > > > > > > > > >> > define > > > > > > > > > > >> > > > > > > > > > > > >> > both > > > > > > > > > > >> > > > > > > > > > > > >> > parts > > > > > > > > > > >> > > > > > > > > > > > >> > of the pipeline in the same Flink job similar to > > > > > > > > > > >> > > > > > > > > > > > >> > KafkaStream's > > > > > > > > > > >> > > > > > > > > > > > >> > #through. > > > > > > > > > > >> > > > > > > > > > > > >> > 2. The schema of the respective intermediate > > > > > > > > > > >> > > > > > > > > > > > >> > stream/topic > > > > > > > > > > >> > > > > > > > > > > > >> > would > > > > > > > > > > >> > > > > > > > > > > > >> > need > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > be > > > > > > > > > > >> > > > > > > > > > > > >> > managed by Flink to encode both records and > > > > > > > > > > >> > > > > > > > > > > > >> > watermarks. > > > > > > > > > > >> > > > > > > > > > > > >> > This > > > > > > > > > > >> > > > > > > > > > > > >> > reduces > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > usability quite a bit and needs to be carefully > > > > > > > > > > >> > > > > > > > > > > > >> > crafted. > > > > > > > > > > >> > > > > > > > > > > > >> > 3. It's not clear to me if constructs like > > > > > > > > > > >> > > > > > > > > > > > >> > SchemaRegistry > > > > > > > > > > >> > > > > > > > > > > > >> > can > > > > > > > > > > >> > > > > > > > > > > > >> > be > > > > > > > > > > >> > > > > > > > > > > > >> > properly > > > > > > > > > > >> > > > > > > > > > > > >> > supported (and also if they should be supported) > > > > > > > > > > >> > > > > > > > > > > > >> > in > > > > > > > > > > >> > > > > > > > > > > > >> > terms > > > > > > > > > > >> > > > > > > > > > > > >> > of > > > > > > > > > > >> > > > > > > > > > > > >> > schema evolution. > > > > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker > > > > > > > > > > >> > > > > > > > > > > > >> > would > > > > > > > > > > >> > > > > > > > > > > > >> > also > > > > > > > > > > >> > > > > > > > > > > > >> > need > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > be encoded. > > > > > > > > > > >> > 5. It's important to have some way to transport > > > > > > > > > > >> > > > > > > > > > > > >> > backpressure > > > > > > > > > > >> > > > > > > > > > > > >> > from > > > > > > > > > > >> > > > > > > > > > > > >> > the downstream to the upstream. Or else you would > > > > > > > > > > >> > > > > > > > > > > > >> > have > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > same > > > > > > > > > > >> > > > > > > > > > > > >> > issue as KafkaStreams where two separate > > > > > > > > > > >> > > > > > > > > > > > >> > pipelines > > > > > > > > > > >> > > > > > > > > > > > >> > can > > > > > > > > > > >> > > > > > > > > > > > >> > drift > > > > > > > > > > >> > > > > > > > > > > > >> > so > > > > > > > > > > >> > > > > > > > > > > > >> > far away that > > > > > > > > > > >> > > > > > > > > > > > >> > you > > > > > > > > > > >> > > > > > > > > > > > >> > experience data loss if the data retention period > > > > > > > > > > >> > > > > > > > > > > > >> > is > > > > > > > > > > >> > > > > > > > > > > > >> > smaller > > > > > > > > > > >> > > > > > > > > > > > >> > than > > > > > > > > > > >> > > > > > > > > > > > >> > the drift. > > > > > > > > > > >> > 6. It's clear that you trade a huge chunk of > > > > > > > > > > >> > > > > > > > > > > > >> > throughput > > > > > > > > > > >> > > > > > > > > > > > >> > for > > > > > > > > > > >> > > > > > > > > > > > >> > lower > > > > > > > > > > >> > > > > > > > > > > > >> > overall > > > > > > > > > > >> > > > > > > > > > > > >> > latency in case of failure. So it's an > > > > > > > > > > >> > > > > > > > > > > > >> > interesting > > > > > > > > > > >> > > > > > > > > > > > >> > feature > > > > > > > > > > >> > > > > > > > > > > > >> > for > > > > > > > > > > >> > > > > > > > > > > > >> > use > > > > > > > > > > >> > > > > > > > > > > > >> > cases > > > > > > > > > > >> > > > > > > > > > > > >> > with SLAs. > > > > > > > > > > >> > > > > > > > > > > > >> > Since we are phasing out SinkFunction, I'd prefer > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > only > > > > > > > > > > >> > > > > > > > > > > > >> > support > > > > > > > > > > >> > > > > > > > > > > > >> > SinkWriter. Having a no-op default sounds good to > > > > > > > > > > >> > > > > > > > > > > > >> > me. > > > > > > > > > > >> > > > > > > > > > > > >> > We have some experimental feature for Kafka [1], > > > > > > > > > > >> > > > > > > > > > > > >> > which > > > > > > > > > > >> > > > > > > > > > > > >> > pretty > > > > > > > > > > >> > > > > > > > > > > > >> > much > > > > > > > > > > >> > > > > > > > > > > > >> > reflects > > > > > > > > > > >> > > > > > > > > > > > >> > your idea. Here we have an ugly workaround to be > > > > > > > > > > >> > > > > > > > > > > > >> > able > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > process > > > > > > > > > > >> > > > > > > > > > > > >> > the watermark by using a custom StreamSink task. > > > > > > > > > > >> > > > > > > > > > > > >> > We > > > > > > > > > > >> > > > > > > > > > > > >> > could > > > > > > > > > > >> > > > > > > > > > > > >> > also > > > > > > > > > > >> > > > > > > > > > > > >> > try to > > > > > > > > > > >> > > > > > > > > > > > >> > create a > > > > > > > > > > >> > > > > > > > > > > > >> > FLIP that abstracts the actual system away and > > > > > > > > > > >> > > > > > > > > > > > >> > then > > > > > > > > > > >> > > > > > > > > > > > >> > we > > > > > > > > > > >> > > > > > > > > > > > >> > could > > > > > > > > > > >> > > > > > > > > > > > >> > use > > > > > > > > > > >> > > > > > > > > > > > >> > the approach for both Pulsar and Kafka. > > > > > > > > > > >> > > > > > > > > > > > >> > [1] > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw! > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$ > > > > > > > > > > >> > > > > > > > > > > > >> > [github[.]com] > > > > > > > > > > >> > > > > > > > > > > > >> > On Mon, May 17, 2021 at 10:44 PM Eron > > > > > > > > > > >> Wright<ewri...@streamnative.io.invalid> < > > > > > > ewri...@streamnative.io > > > > > > > > > > .invalid> > > > > > > > > > > >> wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > I would like to propose an enhancement to the > > > > > > > > > > >> > > > > > > > > > > > >> > Sink > > > > > > > > > > >> > > > > > > > > > > > >> > API, > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > ability > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > receive upstream watermarks. I'm aware that > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > sink > > > > > > > > > > >> > > > > > > > > > > > >> > context > > > > > > > > > > >> > > > > > > > > > > > >> > provides > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > current watermark for a given record. I'd like > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > be > > > > > > > > > > >> > > > > > > > > > > > >> > able > > > > > > > > > > >> > > > > > > > > > > > >> > to > > > > > > > > > > >> > > > > > > > > > > > >> > write > > > > > > > > > > >> > > > > > > > > > > > >> > a > > > > > > > > > > >> > > > > > > > > > > > >> > sink > > > > > > > > > > >> > > > > > > > > > > > >> > function that is invoked whenever the watermark > > > > > > > > > > >> > > > > > > > > > > > >> > changes. > > > > > > > > > > >> > > > > > > > > > > > >> > Out > > > > > > > > > > >> > > > > > > > > > > > >> > of > > > > > > > > > > >> > > > > > > > > > > > >> > scope > > > > > > > > > > >> > > > > > > > > > > > >> > would be event-time timers (since sinks aren't > > > > > > > > > > >> > > > > > > > > > > > >> > keyed). > > > > > > > > > > >> > > > > > > > > > > > >> > For context, imagine that a stream storage > > > > > > > > > > >> > > > > > > > > > > > >> > system > > > > > > > > > > >> > > > > > > > > > > > >> > had > > > > > > > > > > >> > > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > > > > > > > >> > ability to persist watermarks in addition to > > > > > > > > > > >> > > > > > > > > > > > >> > ordinary > > > > > > > > > > >> > > > > > > > > > > > >> > elements, > > > > > > > > > > >> > > > > > > > > > > > >> > e.g. to serve > > > > > > > > > > >> > > > > > > > > > > > >> > as > > > > > > > > > > >> > > > > > > > > > > > >> > source watermarks in a downstream processor. > > > > > > > > > > >> > > > > > > > > > > > >> > Ideally > > > > > > > > > > >> > > > > > > > > > > > >> > one > > > > > > > > > > >> > > > > > > > > > > > >> > could > > > > > > > > > > >> > > > > > > > > > > > >> > compose a > > > > > > > > > > >> > > > > > > > > > > > >> > multi-stage, event-driven application, with > > > > > > > > > > >> > > > > > > > > > > > >> > watermarks > > > > > > > > > > >> > > > > > > > > > > > >> > flowing > > > > > > > > > > >> > > > > > > > > > > > >> > end-to-end > > > > > > > > > > >> > > > > > > > > > > > >> > without need for a heuristics-based watermark > > > > > > > > > > >> > > > > > > > > > > > >> > at > > > > > > > > > > >> > > > > > > > > > > > >> > each > > > > > > > > > > >> > > > > > > > > > > > >> > stage. > > > > > > > > > > >> > > > > > > > > > > > >> > The specific proposal would be a new method on > > > > > > > > > > >> > > > > > > > > > > > >> > `SinkFunction` > > > > > > > > > > >> > > > > > > > > > > > >> > and/or > > > > > > > > > > >> > > > > > > > > > > > >> > on > > > > > > > > > > >> > > > > > > > > > > > >> > `SinkWriter`, called 'processWatermark' or > > > > > > > > > > >> > > > > > > > > > > > >> > 'writeWatermark', > > > > > > > > > > >> > > > > > > > > > > > >> > with a > > > > > > > > > > >> > > > > > > > > > > > >> > default > > > > > > > > > > >> > > > > > > > > > > > >> > implementation that does nothing. > > > > > > > > > > >> > > > > > > > > > > > >> > Thoughts? > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks! > > > > > > > > > > >> > Eron Wright > > > > > > > > > > >> > StreamNative > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > -- > > > > > > > > > > >> > > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > > >> > > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > > >> > streamnative.io | Meet with me > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 > > > > > > > > > > >> > > > > > > > > > > > >> > dMtQrD25c$ [calendly[.]com]> > > > > > > > > > > >> > > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ > > > > > > > > > > >> > > > > > > > > > > > >> > [github[.]com]> > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > https://urldefense.com/v3/__https://www.linkedin.com/company/stream > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 > > > > > > > > > > >> > > > > > > > > > > > >> > dMqO4UZJa$ [linkedin[.]com]> > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__ > > > > > > > > > > >> > > > > > > > > > > > >> > ;! > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ > > > > > > > > > > >> > > > > > > > > > > > >> > [twitter[.]com]> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > -- > > > > > > > > > > >> > > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > > >> > > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > > >> > streamnative.io | Meet with me > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1 > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ > > > > > > > > > > >> > > > > > > > > > > > >> > rD25c$ [calendly[.]com]> > > > > > > > > > > >> > > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI > > > > > > > > > > >> > > > > > > > > > > > >> > ! > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ > > > > > > > > > > >> > > > > > > > > > > > >> > [github[.]com]> > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://www.linkedin.com/company/streamna > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO > > > > > > > > > > >> > > > > > > > > > > > >> > 4UZJa$ [linkedin[.]com]> > > > > > > > > > > >> > < > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ > > > > > > > > > > >> > > > > > > > > > > > >> > [twitter[.]com]> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > -- > > > > > > > > > > >> > > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > > >> > > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > > >> > streamnative.io | Meet with me< > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> < > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > >> > <https://github.com/streamnative> < > > > > > > > > https://github.com/streamnative > > > > > > > > > >< > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> < > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/>< > > > > > > > > > > >> https://twitter.com/streamnativeio/> < > > > > > > > > > > https://twitter.com/streamnativeio/ > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > -- > > > > > > > > > > >> > > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > > >> > > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > > >> > streamnative.io | Meet with me< > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> < > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > >> > <https://github.com/streamnative> < > > > > > > > > https://github.com/streamnative > > > > > > > > > >< > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> < > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/>< > > > > > > > > > > >> https://twitter.com/streamnativeio/> < > > > > > > > > > > https://twitter.com/streamnativeio/ > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > -- > > > > > > > > > > >> > > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > > >> > > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > > >> > streamnative.io | Meet with me< > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> < > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > >> > <https://github.com/streamnative> < > > > > > > > > https://github.com/streamnative > > > > > > > > > >< > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> < > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/>< > > > > > > > > > > >> https://twitter.com/streamnativeio/> < > > > > > > > > > > https://twitter.com/streamnativeio/ > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > -- > > > > > > > > > > >> > > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > > >> > > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > > >> > streamnative.io | Meet with me< > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> < > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > >> > <https://github.com/streamnative> < > > > > > > > > https://github.com/streamnative > > > > > > > > > >< > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> < > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/>< > > > > > > > > > > >> https://twitter.com/streamnativeio/> < > > > > > > > > > > https://twitter.com/streamnativeio/ > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > -- > > > > > > > > > > >> > > > > > > > > > > > >> > Eron Wright Cloud Engineering Lead > > > > > > > > > > >> > > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939> > > > > > > > > > > >> > streamnative.io | Meet with me< > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> < > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> > > > > > > > > > > >> > <https://github.com/streamnative> < > > > > > > > > https://github.com/streamnative > > > > > > > > > >< > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> < > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/>< > > > > > > > > > > >> https://twitter.com/streamnativeio/> < > > > > > > > > > > https://twitter.com/streamnativeio/ > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >