Adding the InterruptException to the write method would make it explicit that the write call can block but must react to interruptions (e.g. when Flink wants to cancel the operation). I think this makes the contract a bit clearer.
I think starting simple and then extending the API as we see the need is a good idea. Cheers, Till On Tue, Jun 22, 2021 at 11:20 AM Hausmann, Steffen <shau...@amazon.de> wrote: > Hey, > > Agreed on starting with a blocking `write`. I've adapted the FLIP > accordingly. > > For now I've chosen to add the `InterruptedException` to the `write` > method signature as I'm not fully understanding the implications of > swallowing the exception. Depending on the details of the code that is > calling the write method, it may cause event loss. But this seems more of > an implementation detail, that we can revisit once we are actually > implementing the sink. > > Unless there are additional comments, does it make sense to start the > voting process in the next day or two? > > Cheers, Steffen > > > On 21.06.21, 14:51, "Piotr Nowojski" <pnowoj...@apache.org> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hi, > > Thanks Steffen for the explanations. I think it makes sense to me. > > Re Arvid/Steffen: > > - Keep in mind that even if we choose to provide a non blocking API > using > the `isAvailable()`/`getAvailableFuture()` method, we would still need > to > support blocking inside the sinks. For example at the very least, > emitting > many records at once (`flatMap`) or firing timers are scenarios when > output > availability would be ignored at the moment by the runtime. Also I > would > imagine writing very large (like 1GB) records would be blocking on > something as well. > - Secondly, exposing availability to the API level might not be that > easy/trivial. The availability pattern as defined in > `AvailabilityProvider` > class is quite complicated and not that easy to implement by a user. > > Both of those combined with lack of a clear motivation for adding > `AvailabilityProvider` to the sinks/operators/functions, I would vote > on > just starting with blocking `write` calls. This can always be extended > in > the future with availability if needed/motivated properly. > > That would be aligned with either Arvid's option 1 or 2. I don't know > what > are the best practices with `InterruptedException`, but I'm always > afraid > of it, so I would feel personally safer with option 2. > > I'm not sure what problem option 3 is helping to solve? Adding > `wakeUp()` > would sound strange to me. > > Best, > Piotrek > > pon., 21 cze 2021 o 12:15 Arvid Heise <ar...@apache.org> napisał(a): > > > Hi Piotr, > > > > to pick up this discussion thread again: > > - This FLIP is about providing some base implementation for FLIP-143 > sinks > > that make adding new implementations easier, similar to the > > SourceReaderBase. > > - The whole availability topic will most likely be a separate FLIP. > The > > basic issue just popped up here because we currently have no way to > signal > > backpressure in sinks except by blocking `write`. This feels quite > natural > > in sinks with sync communication but quite unnatural in async sinks. > > > > Now we have a couple of options. In all cases, we would have some WIP > > limit on the number of records/requests being able to be processed in > > parallel asynchronously (similar to asyncIO). > > 1. We use some blocking queue in `write`, then we need to handle > > interruptions. In the easiest case, we extend `write` to throw the > > `InterruptedException`, which is a small API change. > > 2. We use a blocking queue, but handle interrupts and > swallow/translate > > them. No API change. > > Both solutions block the task thread, so any RPC message / unaligned > > checkpoint would be processed only after the backpressure is > temporarily > > lifted. That's similar to the discussions that you linked. > Cancellation may > > also be a tad harder on 2. > > 3. We could also add some `wakeUp` to the `SinkWriter` similar to > > `SplitFetcher` [1]. Basically, you use a normal queue with a > completeable > > future on which you block. Wakeup would be a clean way to complete > it next > > to the natural completion through finished requests. > > 4. We add availability to the sink. However, this API change also > requires > > that we allow operators to be available so it may be a bigger change > with > > undesired side-effects. On the other hand, we could also use the same > > mechanism for asyncIO. > > > > For users of FLIP-171, none of the options are exposed. So we could > also > > start with a simple solution (add `InterruptedException`) and later > try to > > add availability. Option 1+2 would also not require an additional > FLIP; we > > could add it as part of this FLIP. > > > > Best, > > > > Arvid > > > > [1] > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258 > > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen > > <shau...@amazon.de.invalid> wrote: > > > >> Hey Piotrek, > >> > >> Thanks for your comments on the FLIP. I'll address your second > question > >> first, as I think it's more central to this FLIP. Just looking at > the AWS > >> ecosystem, there are several sinks with overlapping functionality. > I've > >> chosen AWS sinks here because I'm most familiar with those, but a > similar > >> argument applies more generically for destination that support > async ingest. > >> > >> There is, for instance, a sink for Amazon Kinesis Data Streams that > is > >> part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose > [2], a > >> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. > All > >> these sinks have implemented their own mechanisms for batching, > persisting, > >> and retrying events. And I'm not sure if all of them properly > participate > >> in checkpointing. [3] even seems to closely mirror [1] as it > contains > >> references to the Kinesis Producer Library, which is unrelated to > Amazon > >> DynamoDB. > >> > >> These sinks predate FLIP-143. But as batching, persisting, and > retrying > >> capabilities do not seem to be part of FLIP-143, I'd argue that we > would > >> end up with similar duplication, even if these sinks were rewritten > today > >> based on FLIP-143. And that's the idea of FLIP-171: abstract away > these > >> commonly required capabilities so that it becomes easy to create > support > >> for a wide range of destination without having to think about > batching, > >> retries, checkpointing, etc. I've included an example in the FLIP > [5] that > >> shows that it only takes a couple of lines of code to implement a > sink with > >> exactly-once semantics. To be fair, the example is lacking robust > failure > >> handling and some more advanced capabilities of [1], but I think it > still > >> supports this point. > >> > >> Regarding your point on the isAvailable pattern. We need some way > for the > >> sink to propagate backpressure and we would also like to support > time based > >> buffering hints. There are two options I currently see and would > need > >> additional input on which one is the better or more desirable one. > The > >> first option is to use the non-blocking isAvailable pattern. > Internally, > >> the sink persists buffered events in the snapshot state which > avoids having > >> to flush buffered record on a checkpoint. This seems to align well > with the > >> non-blocking isAvailable pattern. The second option is to make > calls to > >> `write` blocking and leverage an internal thread to trigger flushes > based > >> on time based buffering hints. We've discussed these options with > Arvid and > >> suggested to assumed that the `isAvailable` pattern will become > available > >> for sinks through and additional FLIP. > >> > >> I think it is an important discussion to have. My understanding of > the > >> implications for Flink in general are very naïve, so I'd be happy > to get > >> further guidance. However, I don't want to make this discussion > part of > >> FLIP-171. For FLIP-171 we'll use whatever is available. > >> > >> Does that make sense? > >> > >> Cheers, Steffen > >> > >> > >> [1] > >> > https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis > >> [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors > >> [3] https://github.com/klarna-incubator/flink-connector-dynamodb > >> [4] https://github.com/awslabs/amazon-timestream-tools/ > >> [5] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams > >> > >> > >> On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote: > >> > >> CAUTION: This email originated from outside of the > organization. Do > >> not click links or open attachments unless you can confirm the > sender and > >> know the content is safe. > >> > >> > >> > >> Hi Steffen, > >> > >> Thanks for writing down the proposal. Back when the new Sink > API was > >> being > >> discussed, I was proposing to add our usual > `CompletableFuture<Void> > >> isAvailable()` pattern to make sinks non-blocking. You can see > the > >> discussion starting here [1], and continuing for a couple of > more > >> posts > >> until here [2]. Back then, the outcome was that it would give > very > >> little > >> benefit, at the expense of making the API more complicated. > Could you > >> maybe > >> relate your proposal to that discussion from last year? > >> > >> I see that your proposal is going much further than just adding > the > >> availability method, could you also motivate this a bit further? > >> Could you > >> maybe reference/show some sinks that: > >> 1. are already implemented using FLIP-143 > >> 2. that have some code duplication... > >> 3. ...this duplication would be solved by FLIP-171 > >> > >> Best, > >> Piotrek > >> > >> [1] > >> > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html > >> [2] > >> > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html > >> > >> śr., 9 cze 2021 o 09:49 Hausmann, Steffen > <shau...@amazon.de.invalid> > >> napisał(a): > >> > >> > Hi there, > >> > > >> > We would like to start a discussion thread on "FLIP-171: Async > >> Sink" [1], > >> > where we propose to create a common abstraction for > destinations > >> that > >> > support async requests. This abstraction will make it easier > to add > >> > destinations to Flink by implementing a lightweight shim, > while it > >> avoids > >> > maintaining dozens of independent sinks. > >> > > >> > Looking forward to your feedback. > >> > > >> > Cheers, Steffen > >> > > >> > [1] > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > >> > > >> > > >> > > >> > Amazon Web Services EMEA SARL > >> > 38 avenue John F. Kennedy, L-1855 Luxembourg > >> > Sitz der Gesellschaft: L-1855 Luxemburg > >> > eingetragen im Luxemburgischen Handelsregister unter R.C.S. > B186284 > >> > > >> > Amazon Web Services EMEA SARL, Niederlassung Deutschland > >> > Marcel-Breuer-Str. 12, D-80807 Muenchen > >> > Sitz der Zweigniederlassung: Muenchen > >> > eingetragen im Handelsregister des Amtsgerichts Muenchen > unter HRB > >> 242240, > >> > USt-ID DE317013094 > >> > > >> > > >> > > >> > > >> > >> > >> > >> > >> Amazon Web Services EMEA SARL > >> 38 avenue John F. Kennedy, L-1855 Luxembourg > >> Sitz der Gesellschaft: L-1855 Luxemburg > >> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > >> > >> Amazon Web Services EMEA SARL, Niederlassung Deutschland > >> Marcel-Breuer-Str. 12, D-80807 Muenchen > >> Sitz der Zweigniederlassung: Muenchen > >> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB > >> 242240, USt-ID DE317013094 > >> > >> > >> > >> > > > > > Amazon Web Services EMEA SARL > 38 avenue John F. Kennedy, L-1855 Luxembourg > Sitz der Gesellschaft: L-1855 Luxemburg > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > Marcel-Breuer-Str. 12, D-80807 Muenchen > Sitz der Zweigniederlassung: Muenchen > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, > USt-ID DE317013094 > > > >