Hi, Thanks Steffen for the explanations. I think it makes sense to me.
Re Arvid/Steffen: - Keep in mind that even if we choose to provide a non blocking API using the `isAvailable()`/`getAvailableFuture()` method, we would still need to support blocking inside the sinks. For example at the very least, emitting many records at once (`flatMap`) or firing timers are scenarios when output availability would be ignored at the moment by the runtime. Also I would imagine writing very large (like 1GB) records would be blocking on something as well. - Secondly, exposing availability to the API level might not be that easy/trivial. The availability pattern as defined in `AvailabilityProvider` class is quite complicated and not that easy to implement by a user. Both of those combined with lack of a clear motivation for adding `AvailabilityProvider` to the sinks/operators/functions, I would vote on just starting with blocking `write` calls. This can always be extended in the future with availability if needed/motivated properly. That would be aligned with either Arvid's option 1 or 2. I don't know what are the best practices with `InterruptedException`, but I'm always afraid of it, so I would feel personally safer with option 2. I'm not sure what problem option 3 is helping to solve? Adding `wakeUp()` would sound strange to me. Best, Piotrek pon., 21 cze 2021 o 12:15 Arvid Heise <ar...@apache.org> napisał(a): > Hi Piotr, > > to pick up this discussion thread again: > - This FLIP is about providing some base implementation for FLIP-143 sinks > that make adding new implementations easier, similar to the > SourceReaderBase. > - The whole availability topic will most likely be a separate FLIP. The > basic issue just popped up here because we currently have no way to signal > backpressure in sinks except by blocking `write`. This feels quite natural > in sinks with sync communication but quite unnatural in async sinks. > > Now we have a couple of options. In all cases, we would have some WIP > limit on the number of records/requests being able to be processed in > parallel asynchronously (similar to asyncIO). > 1. We use some blocking queue in `write`, then we need to handle > interruptions. In the easiest case, we extend `write` to throw the > `InterruptedException`, which is a small API change. > 2. We use a blocking queue, but handle interrupts and swallow/translate > them. No API change. > Both solutions block the task thread, so any RPC message / unaligned > checkpoint would be processed only after the backpressure is temporarily > lifted. That's similar to the discussions that you linked. Cancellation may > also be a tad harder on 2. > 3. We could also add some `wakeUp` to the `SinkWriter` similar to > `SplitFetcher` [1]. Basically, you use a normal queue with a completeable > future on which you block. Wakeup would be a clean way to complete it next > to the natural completion through finished requests. > 4. We add availability to the sink. However, this API change also requires > that we allow operators to be available so it may be a bigger change with > undesired side-effects. On the other hand, we could also use the same > mechanism for asyncIO. > > For users of FLIP-171, none of the options are exposed. So we could also > start with a simple solution (add `InterruptedException`) and later try to > add availability. Option 1+2 would also not require an additional FLIP; we > could add it as part of this FLIP. > > Best, > > Arvid > > [1] > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258 > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen > <shau...@amazon.de.invalid> wrote: > >> Hey Piotrek, >> >> Thanks for your comments on the FLIP. I'll address your second question >> first, as I think it's more central to this FLIP. Just looking at the AWS >> ecosystem, there are several sinks with overlapping functionality. I've >> chosen AWS sinks here because I'm most familiar with those, but a similar >> argument applies more generically for destination that support async ingest. >> >> There is, for instance, a sink for Amazon Kinesis Data Streams that is >> part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a >> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All >> these sinks have implemented their own mechanisms for batching, persisting, >> and retrying events. And I'm not sure if all of them properly participate >> in checkpointing. [3] even seems to closely mirror [1] as it contains >> references to the Kinesis Producer Library, which is unrelated to Amazon >> DynamoDB. >> >> These sinks predate FLIP-143. But as batching, persisting, and retrying >> capabilities do not seem to be part of FLIP-143, I'd argue that we would >> end up with similar duplication, even if these sinks were rewritten today >> based on FLIP-143. And that's the idea of FLIP-171: abstract away these >> commonly required capabilities so that it becomes easy to create support >> for a wide range of destination without having to think about batching, >> retries, checkpointing, etc. I've included an example in the FLIP [5] that >> shows that it only takes a couple of lines of code to implement a sink with >> exactly-once semantics. To be fair, the example is lacking robust failure >> handling and some more advanced capabilities of [1], but I think it still >> supports this point. >> >> Regarding your point on the isAvailable pattern. We need some way for the >> sink to propagate backpressure and we would also like to support time based >> buffering hints. There are two options I currently see and would need >> additional input on which one is the better or more desirable one. The >> first option is to use the non-blocking isAvailable pattern. Internally, >> the sink persists buffered events in the snapshot state which avoids having >> to flush buffered record on a checkpoint. This seems to align well with the >> non-blocking isAvailable pattern. The second option is to make calls to >> `write` blocking and leverage an internal thread to trigger flushes based >> on time based buffering hints. We've discussed these options with Arvid and >> suggested to assumed that the `isAvailable` pattern will become available >> for sinks through and additional FLIP. >> >> I think it is an important discussion to have. My understanding of the >> implications for Flink in general are very naïve, so I'd be happy to get >> further guidance. However, I don't want to make this discussion part of >> FLIP-171. For FLIP-171 we'll use whatever is available. >> >> Does that make sense? >> >> Cheers, Steffen >> >> >> [1] >> https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis >> [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors >> [3] https://github.com/klarna-incubator/flink-connector-dynamodb >> [4] https://github.com/awslabs/amazon-timestream-tools/ >> [5] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams >> >> >> On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote: >> >> CAUTION: This email originated from outside of the organization. Do >> not click links or open attachments unless you can confirm the sender and >> know the content is safe. >> >> >> >> Hi Steffen, >> >> Thanks for writing down the proposal. Back when the new Sink API was >> being >> discussed, I was proposing to add our usual `CompletableFuture<Void> >> isAvailable()` pattern to make sinks non-blocking. You can see the >> discussion starting here [1], and continuing for a couple of more >> posts >> until here [2]. Back then, the outcome was that it would give very >> little >> benefit, at the expense of making the API more complicated. Could you >> maybe >> relate your proposal to that discussion from last year? >> >> I see that your proposal is going much further than just adding the >> availability method, could you also motivate this a bit further? >> Could you >> maybe reference/show some sinks that: >> 1. are already implemented using FLIP-143 >> 2. that have some code duplication... >> 3. ...this duplication would be solved by FLIP-171 >> >> Best, >> Piotrek >> >> [1] >> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html >> [2] >> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html >> >> śr., 9 cze 2021 o 09:49 Hausmann, Steffen <shau...@amazon.de.invalid> >> napisał(a): >> >> > Hi there, >> > >> > We would like to start a discussion thread on "FLIP-171: Async >> Sink" [1], >> > where we propose to create a common abstraction for destinations >> that >> > support async requests. This abstraction will make it easier to add >> > destinations to Flink by implementing a lightweight shim, while it >> avoids >> > maintaining dozens of independent sinks. >> > >> > Looking forward to your feedback. >> > >> > Cheers, Steffen >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink >> > >> > >> > >> > Amazon Web Services EMEA SARL >> > 38 avenue John F. Kennedy, L-1855 Luxembourg >> > Sitz der Gesellschaft: L-1855 Luxemburg >> > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 >> > >> > Amazon Web Services EMEA SARL, Niederlassung Deutschland >> > Marcel-Breuer-Str. 12, D-80807 Muenchen >> > Sitz der Zweigniederlassung: Muenchen >> > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB >> 242240, >> > USt-ID DE317013094 >> > >> > >> > >> > >> >> >> >> >> Amazon Web Services EMEA SARL >> 38 avenue John F. Kennedy, L-1855 Luxembourg >> Sitz der Gesellschaft: L-1855 Luxemburg >> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 >> >> Amazon Web Services EMEA SARL, Niederlassung Deutschland >> Marcel-Breuer-Str. 12, D-80807 Muenchen >> Sitz der Zweigniederlassung: Muenchen >> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB >> 242240, USt-ID DE317013094 >> >> >> >>