Thanks for the update Steffen. I'll try to take a look at it asap. Cheers, Till
On Fri, Aug 20, 2021 at 1:34 PM Hausmann, Steffen <shau...@amazon.de> wrote: > Hi Till, > > I've updated the wiki page as per the discussion on flip-177. I hope it > makes more sense now. > > Cheers, Steffen > > On 16.07.21, 18:28, "Till Rohrmann" <trohrm...@apache.org> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Sure, thanks for the pointers. > > Cheers, > Till > > On Fri, Jul 16, 2021 at 6:19 PM Hausmann, Steffen > <shau...@amazon.de.invalid> > wrote: > > > Hi Till, > > > > You are right, I’ve left out some implementation details, which have > > actually changed a couple of time as part of the ongoing discussion. > You > > can find our current prototype here [1] and a sample implementation > of the > > KPL free Kinesis sink here [2]. > > > > I plan to update the FLIP. But I think would it be make sense to wait > > until the implementation has stabilized enough before we update the > FLIP to > > the final state. > > > > Does that make sense? > > > > Cheers, Steffen > > > > [1] > > > https://github.com/sthm/flink/tree/flip-171-177/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink > > [2] > > > https://github.com/sthm/flink/blob/flip-171-177/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java > > > > From: Till Rohrmann <trohrm...@apache.org> > > Date: Friday, 16. July 2021 at 18:10 > > To: Piotr Nowojski <pnowoj...@apache.org> > > Cc: Steffen Hausmann <shau...@amazon.de>, "dev@flink.apache.org" < > > dev@flink.apache.org>, Arvid Heise <ar...@apache.org> > > Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink > > > > > > CAUTION: This email originated from outside of the organization. Do > not > > click links or open attachments unless you can confirm the sender > and know > > the content is safe. > > > > > > Hi Steffen, > > > > I've taken another look at the FLIP and I stumbled across a couple of > > inconsistencies. I think it is mainly because of the lacking code. > For > > example, it is not fully clear to me based on the current FLIP how we > > ensure that there are no in-flight requests when > > AsyncSinkWriter.snapshotState is called. Also the concrete > implementation > > of the AsyncSinkCommitter could be helpful for understanding how the > > AsyncSinkWriter works in the end. Do you plan to update the FLIP > > accordingly? > > > > Cheers, > > Till > > > > On Wed, Jun 30, 2021 at 8:36 AM Piotr Nowojski <pnowoj...@apache.org > > <mailto:pnowoj...@apache.org>> wrote: > > Thanks for addressing this issue :) > > > > Best, Piotrek > > > > wt., 29 cze 2021 o 17:58 Hausmann, Steffen <shau...@amazon.de > <mailto: > > shau...@amazon.de>> napisał(a): > > Hey Poitr, > > > > I've just adapted the FLIP and changed the signature for the > > `submitRequestEntries` method: > > > > protected abstract void submitRequestEntries(List<RequestEntryT> > > requestEntries, ResultFuture<?> requestResult); > > > > In addition, we are likely to use an AtomicLong to track the number > of > > outstanding requests, as you have proposed in 2b). I've already > indicated > > this in the FLIP, but it's not fully fleshed out. But as you have > said, > > that seems to be an implementation detail and the important part is > the > > change of the `submitRequestEntries` signature. > > > > Thanks for your feedback! > > > > Cheers, Steffen > > > > > > On 25.06.21, 17:05, "Hausmann, Steffen" <shau...@amazon.de.INVALID> > wrote: > > > > CAUTION: This email originated from outside of the organization. > Do > > not click links or open attachments unless you can confirm the > sender and > > know the content is safe. > > > > > > > > Hi Piotr, > > > > I’m happy to take your guidance on this. I need to think through > your > > proposals and I’ll follow-up on Monday with some more context so > that we > > can close the discussion on these details. But for now, I’ll close > the vote. > > > > Thanks, Steffen > > > > From: Piotr Nowojski <pnowoj...@apache.org<mailto: > pnowoj...@apache.org > > >> > > Date: Friday, 25. June 2021 at 14:48 > > To: Till Rohrmann <trohrm...@apache.org<mailto: > trohrm...@apache.org>> > > Cc: Steffen Hausmann <shau...@amazon.de<mailto:shau...@amazon.de>>, > " > > dev@flink.apache.org<mailto:dev@flink.apache.org>" < > dev@flink.apache.org > > <mailto:dev@flink.apache.org>>, Arvid Heise <ar...@apache.org > <mailto: > > ar...@apache.org>> > > Subject: RE: [EXTERNAL] [DISCUSS] FLIP-171: Async Sink > > > > > > CAUTION: This email originated from outside of the organization. > Do > > not click links or open attachments unless you can confirm the > sender and > > know the content is safe. > > > > > > Hey, > > > > I've just synced with Arvid about a couple of more remarks from > my > > side and he shared mine concerns. > > > > 1. I would very strongly recommend ditching > `CompletableFuture<?> ` > > from the `protected abstract CompletableFuture<?> > > submitRequestEntries(List<RequestEntryT> requestEntries);` in favor > of > > something like > > `org.apache.flink.streaming.api.functions.async.ResultFuture` > interface. > > `CompletableFuture<?>` would partially make the threading model of > the > > `AsyncSincWriter` part of the public API and it would tie our hands. > > Regardless how `CompletableFuture<?>` is used, it imposes performance > > overhead because it's synchronisation/volatile inside of it. On the > other > > hand something like: > > > > protected abstract void submitRequestEntries(List<RequestEntryT> > > requestEntries, ResultFuture<?> requestResult); > > > > Would allow us to implement the threading model as we wish. > > `ResultFuture` could be backed via `CompletableFuture<?>` > underneath, but > > it could also be something more efficient. I will explain what I > have in > > mind in a second. > > > > 2. It looks to me that proposed `AsyncSinkWriter` Internals are > not > > very efficient and maybe the threading model hasn't been thought > through? > > Especially private fields: > > > > private final BlockingDeque<RequestEntryT> > bufferedRequestEntries; > > private BlockingDeque<CompletableFuture<?>> inFlightRequests; > > > > are a bit strange to me. Why do we need two separate thread safe > > collections? Why do we need a `BlockingDeque` of > `CompletableFuture<?>`s? > > If we are already using a fully synchronised collection, there > should be no > > need for another layer of thread safe `CompletableFuture<?>`. > > > > As I understand, the threading model of the `AsyncSinkWriter` is > very > > similar to that of the `AsyncWaitOperator`, with very similar > requirements > > for inducing backpressure. How I would see it implemented is for > example: > > > > a) Having a single lock, that would encompass the whole > > `AsyncSinkWriter#flush()` method. `flush()` would be called from the > task > > thread (mailbox). To induce backpressure, `#flush()` would just call > > `lock.wait()`. `ResultFuture#complete(...)` called from an async > thread, > > would also synchronize on the same lock, and mark some of the > inflight > > requests as completed and call `lock.notify()`. > > > > b) More efficient solution. On the hot path we would have for > example > > only `AtomicLong numberOfInFlightRequests`. Task thread would be > bumping > > it, `ResultFuture#complete()` would be decreasing it. If the task > thread > > when bumping `numberOfInFlightRequests` exceeds a threshold, he goes > to > > sleep/wait on a lock or some `CompletableFuture`. If > > `ResultFuture#complete()` when decreasing the count goes below the > > threshold, it would wake up the task thread. Compared to the option > a), > > on the hot path, option b) would have only AtomicLong.increment > overhead > > > > c) We could use mailbox, the same way as AsyncWaitOperator is > doing. > > In this case `ResultFuture#complete()` would be enquing mailbox > action, > > which is thread safe on it's own. > > > > Either of those options would be more efficient and simpler > (from the > > threading model perspective) than having two `BlockingQueues` and > > `CompletableFuture<?>`. Also as you can see, neither of those > solutions > > require the overhead of ` CompletableFuture<?> > > submitRequestEntries(List<RequestEntryT> requestEntries)`. Each one > of > > those could use a more efficient and custom implementation of > > `ResultFuture.complete(...)`. > > > > > > Whether we use a), b) or c) I think should be an implementation > > detail. But to allow this to truly be an implementation detail, we > would > > need to agree on 1. Nevertheless I think that the change I proposed > in 1. > > is small enough that I think there is no need to cancel the current > vote on > > the FLIP. > > > > WDYT? > > > > Piotrek > > > > > > wt., 22 cze 2021 o 11:42 Till Rohrmann <trohrm...@apache.org > <mailto: > > trohrm...@apache.org><mailto:trohrm...@apache.org<mailto: > > trohrm...@apache.org>>> napisał(a): > > Adding the InterruptException to the write method would make it > > explicit that the write call can block but must react to > interruptions > > (e.g. when Flink wants to cancel the operation). I think this makes > the > > contract a bit clearer. > > > > I think starting simple and then extending the API as we see the > need > > is a good idea. > > > > Cheers, > > Till > > > > On Tue, Jun 22, 2021 at 11:20 AM Hausmann, Steffen < > shau...@amazon.de > > <mailto:shau...@amazon.de><mailto:shau...@amazon.de<mailto: > > shau...@amazon.de>>> wrote: > > Hey, > > > > Agreed on starting with a blocking `write`. I've adapted the FLIP > > accordingly. > > > > For now I've chosen to add the `InterruptedException` to the > `write` > > method signature as I'm not fully understanding the implications of > > swallowing the exception. Depending on the details of the code that > is > > calling the write method, it may cause event loss. But this seems > more of > > an implementation detail, that we can revisit once we are actually > > implementing the sink. > > > > Unless there are additional comments, does it make sense to > start the > > voting process in the next day or two? > > > > Cheers, Steffen > > > > > > On 21.06.21, 14:51, "Piotr Nowojski" <pnowoj...@apache.org > <mailto: > > pnowoj...@apache.org><mailto:pnowoj...@apache.org<mailto: > > pnowoj...@apache.org>>> wrote: > > > > CAUTION: This email originated from outside of the > organization. > > Do not click links or open attachments unless you can confirm the > sender > > and know the content is safe. > > > > > > > > Hi, > > > > Thanks Steffen for the explanations. I think it makes sense > to me. > > > > Re Arvid/Steffen: > > > > - Keep in mind that even if we choose to provide a non > blocking > > API using > > the `isAvailable()`/`getAvailableFuture()` method, we would > still > > need to > > support blocking inside the sinks. For example at the very > least, > > emitting > > many records at once (`flatMap`) or firing timers are > scenarios > > when output > > availability would be ignored at the moment by the runtime. > Also I > > would > > imagine writing very large (like 1GB) records would be > blocking on > > something as well. > > - Secondly, exposing availability to the API level might not > be > > that > > easy/trivial. The availability pattern as defined in > > `AvailabilityProvider` > > class is quite complicated and not that easy to implement by > a > > user. > > > > Both of those combined with lack of a clear motivation for > adding > > `AvailabilityProvider` to the sinks/operators/functions, I > would > > vote on > > just starting with blocking `write` calls. This can always be > > extended in > > the future with availability if needed/motivated properly. > > > > That would be aligned with either Arvid's option 1 or 2. I > don't > > know what > > are the best practices with `InterruptedException`, but I'm > always > > afraid > > of it, so I would feel personally safer with option 2. > > > > I'm not sure what problem option 3 is helping to solve? > Adding > > `wakeUp()` > > would sound strange to me. > > > > Best, > > Piotrek > > > > pon., 21 cze 2021 o 12:15 Arvid Heise <ar...@apache.org > <mailto: > > ar...@apache.org><mailto:ar...@apache.org<mailto:ar...@apache.org>>> > > napisał(a): > > > > > Hi Piotr, > > > > > > to pick up this discussion thread again: > > > - This FLIP is about providing some base implementation for > > FLIP-143 sinks > > > that make adding new implementations easier, similar to the > > > SourceReaderBase. > > > - The whole availability topic will most likely be a > separate > > FLIP. The > > > basic issue just popped up here because we currently have > no way > > to signal > > > backpressure in sinks except by blocking `write`. This > feels > > quite natural > > > in sinks with sync communication but quite unnatural in > async > > sinks. > > > > > > Now we have a couple of options. In all cases, we would > have > > some WIP > > > limit on the number of records/requests being able to be > > processed in > > > parallel asynchronously (similar to asyncIO). > > > 1. We use some blocking queue in `write`, then we need to > handle > > > interruptions. In the easiest case, we extend `write` to > throw > > the > > > `InterruptedException`, which is a small API change. > > > 2. We use a blocking queue, but handle interrupts and > > swallow/translate > > > them. No API change. > > > Both solutions block the task thread, so any RPC message / > > unaligned > > > checkpoint would be processed only after the backpressure > is > > temporarily > > > lifted. That's similar to the discussions that you linked. > > Cancellation may > > > also be a tad harder on 2. > > > 3. We could also add some `wakeUp` to the `SinkWriter` > similar to > > > `SplitFetcher` [1]. Basically, you use a normal queue with > a > > completeable > > > future on which you block. Wakeup would be a clean way to > > complete it next > > > to the natural completion through finished requests. > > > 4. We add availability to the sink. However, this API > change > > also requires > > > that we allow operators to be available so it may be a > bigger > > change with > > > undesired side-effects. On the other hand, we could also > use the > > same > > > mechanism for asyncIO. > > > > > > For users of FLIP-171, none of the options are exposed. So > we > > could also > > > start with a simple solution (add `InterruptedException`) > and > > later try to > > > add availability. Option 1+2 would also not require an > > additional FLIP; we > > > could add it as part of this FLIP. > > > > > > Best, > > > > > > Arvid > > > > > > [1] > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258 > > > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen > > > <shau...@amazon.de.invalid> wrote: > > > > > >> Hey Piotrek, > > >> > > >> Thanks for your comments on the FLIP. I'll address your > second > > question > > >> first, as I think it's more central to this FLIP. Just > looking > > at the AWS > > >> ecosystem, there are several sinks with overlapping > > functionality. I've > > >> chosen AWS sinks here because I'm most familiar with > those, but > > a similar > > >> argument applies more generically for destination that > support > > async ingest. > > >> > > >> There is, for instance, a sink for Amazon Kinesis Data > Streams > > that is > > >> part of Apache Flink [1], a sink for Amazon Kinesis Data > > Firehose [2], a > > >> sink for Amazon DynamoDB [3], and a sink for Amazon > Timestream > > [4]. All > > >> these sinks have implemented their own mechanisms for > batching, > > persisting, > > >> and retrying events. And I'm not sure if all of them > properly > > participate > > >> in checkpointing. [3] even seems to closely mirror [1] as > it > > contains > > >> references to the Kinesis Producer Library, which is > unrelated > > to Amazon > > >> DynamoDB. > > >> > > >> These sinks predate FLIP-143. But as batching, > persisting, and > > retrying > > >> capabilities do not seem to be part of FLIP-143, I'd > argue that > > we would > > >> end up with similar duplication, even if these sinks were > > rewritten today > > >> based on FLIP-143. And that's the idea of FLIP-171: > abstract > > away these > > >> commonly required capabilities so that it becomes easy to > > create support > > >> for a wide range of destination without having to think > about > > batching, > > >> retries, checkpointing, etc. I've included an example in > the > > FLIP [5] that > > >> shows that it only takes a couple of lines of code to > implement > > a sink with > > >> exactly-once semantics. To be fair, the example is lacking > > robust failure > > >> handling and some more advanced capabilities of [1], but I > > think it still > > >> supports this point. > > >> > > >> Regarding your point on the isAvailable pattern. We need > some > > way for the > > >> sink to propagate backpressure and we would also like to > > support time based > > >> buffering hints. There are two options I currently see and > > would need > > >> additional input on which one is the better or more > desirable > > one. The > > >> first option is to use the non-blocking isAvailable > pattern. > > Internally, > > >> the sink persists buffered events in the snapshot state > which > > avoids having > > >> to flush buffered record on a checkpoint. This seems to > align > > well with the > > >> non-blocking isAvailable pattern. The second option is to > make > > calls to > > >> `write` blocking and leverage an internal thread to > trigger > > flushes based > > >> on time based buffering hints. We've discussed these > options > > with Arvid and > > >> suggested to assumed that the `isAvailable` pattern will > become > > available > > >> for sinks through and additional FLIP. > > >> > > >> I think it is an important discussion to have. My > understanding > > of the > > >> implications for Flink in general are very naïve, so I'd > be > > happy to get > > >> further guidance. However, I don't want to make this > discussion > > part of > > >> FLIP-171. For FLIP-171 we'll use whatever is available. > > >> > > >> Does that make sense? > > >> > > >> Cheers, Steffen > > >> > > >> > > >> [1] > > >> > > > https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis > > >> [2] > > https://github.com/aws/aws-kinesisanalytics-flink-connectors > > >> [3] > > https://github.com/klarna-incubator/flink-connector-dynamodb > > >> [4] https://github.com/awslabs/amazon-timestream-tools/ > > >> [5] > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams > > >> > > >> > > >> On 09.06.21, 19:44, "Piotr Nowojski" < > pnowoj...@apache.org > > <mailto:pnowoj...@apache.org><mailto:pnowoj...@apache.org<mailto: > > pnowoj...@apache.org>>> wrote: > > >> > > >> CAUTION: This email originated from outside of the > > organization. Do > > >> not click links or open attachments unless you can > confirm the > > sender and > > >> know the content is safe. > > >> > > >> > > >> > > >> Hi Steffen, > > >> > > >> Thanks for writing down the proposal. Back when the > new > > Sink API was > > >> being > > >> discussed, I was proposing to add our usual > > `CompletableFuture<Void> > > >> isAvailable()` pattern to make sinks non-blocking. > You can > > see the > > >> discussion starting here [1], and continuing for a > couple > > of more > > >> posts > > >> until here [2]. Back then, the outcome was that it > would > > give very > > >> little > > >> benefit, at the expense of making the API more > complicated. > > Could you > > >> maybe > > >> relate your proposal to that discussion from last > year? > > >> > > >> I see that your proposal is going much further than > just > > adding the > > >> availability method, could you also motivate this a > bit > > further? > > >> Could you > > >> maybe reference/show some sinks that: > > >> 1. are already implemented using FLIP-143 > > >> 2. that have some code duplication... > > >> 3. ...this duplication would be solved by FLIP-171 > > >> > > >> Best, > > >> Piotrek > > >> > > >> [1] > > >> > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html > > >> [2] > > >> > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html > > >> > > >> śr., 9 cze 2021 o 09:49 Hausmann, Steffen > > <shau...@amazon.de.invalid> > > >> napisał(a): > > >> > > >> > Hi there, > > >> > > > >> > We would like to start a discussion thread on > "FLIP-171: > > Async > > >> Sink" [1], > > >> > where we propose to create a common abstraction for > > destinations > > >> that > > >> > support async requests. This abstraction will make > it > > easier to add > > >> > destinations to Flink by implementing a lightweight > shim, > > while it > > >> avoids > > >> > maintaining dozens of independent sinks. > > >> > > > >> > Looking forward to your feedback. > > >> > > > >> > Cheers, Steffen > > >> > > > >> > [1] > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > >> > > > >> > > > >> > > > >> > Amazon Web Services EMEA SARL > > >> > 38 avenue John F. Kennedy, L-1855 Luxembourg > > >> > Sitz der Gesellschaft: L-1855 Luxemburg > > >> > eingetragen im Luxemburgischen Handelsregister unter > > R.C.S. B186284 > > >> > > > >> > Amazon Web Services EMEA SARL, Niederlassung > Deutschland > > >> > Marcel-Breuer-Str. 12, D-80807 Muenchen > > >> > Sitz der Zweigniederlassung: Muenchen > > >> > eingetragen im Handelsregister des Amtsgerichts > Muenchen > > unter HRB > > >> 242240, > > >> > USt-ID DE317013094 > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > >> > > >> > > >> Amazon Web Services EMEA SARL > > >> 38 avenue John F. Kennedy, L-1855 Luxembourg > > >> Sitz der Gesellschaft: L-1855 Luxemburg > > >> eingetragen im Luxemburgischen Handelsregister unter > R.C.S. > > B186284 > > >> > > >> Amazon Web Services EMEA SARL, Niederlassung Deutschland > > >> Marcel-Breuer-Str. 12, D-80807 Muenchen > > >> Sitz der Zweigniederlassung: Muenchen > > >> eingetragen im Handelsregister des Amtsgerichts Muenchen > unter > > HRB > > >> 242240, USt-ID DE317013094 > > >> > > >> > > >> > > >> > > > > > > > > > > Amazon Web Services EMEA SARL > > 38 avenue John F. Kennedy, L-1855 Luxembourg > > Sitz der Gesellschaft: L-1855 Luxemburg > > eingetragen im Luxemburgischen Handelsregister unter R.C.S. > B186284 > > > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > > Marcel-Breuer-Str. 12, D-80807 Muenchen > > Sitz der Zweigniederlassung: Muenchen > > eingetragen im Handelsregister des Amtsgerichts Muenchen unter > HRB > > 242240, USt-ID DE317013094 > > > > > > > > > > > > Amazon Web Services EMEA SARL > > 38 avenue John F. Kennedy, L-1855 Luxembourg > > Sitz der Gesellschaft: L-1855 Luxemburg > > eingetragen im Luxemburgischen Handelsregister unter R.C.S. > B186284 > > > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > > Marcel-Breuer-Str. 12, D-80807 Muenchen > > Sitz der Zweigniederlassung: Muenchen > > eingetragen im Handelsregister des Amtsgerichts Muenchen unter > HRB > > 242240, USt-ID DE317013094 > > > > > > > > > > > > > > > > Amazon Web Services EMEA SARL > > 38 avenue John F. Kennedy, L-1855 Luxembourg > > Sitz der Gesellschaft: L-1855 Luxemburg > > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > > Marcel-Breuer-Str. 12, D-80807 Muenchen > > Sitz der Zweigniederlassung: Muenchen > > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB > 242240, > > USt-ID DE317013094 > > > > > > > > > > > > Amazon Web Services EMEA SARL > > 38 avenue John F. Kennedy, L-1855 Luxembourg > > Sitz der Gesellschaft: L-1855 Luxemburg > > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > > Marcel-Breuer-Str. 12, D-80807 Muenchen > > Sitz der Zweigniederlassung: Muenchen > > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB > 242240, > > USt-ID DE317013094 > > > > > > > > > > > > > Amazon Web Services EMEA SARL > 38 avenue John F. Kennedy, L-1855 Luxembourg > Sitz der Gesellschaft: L-1855 Luxemburg > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > Marcel-Breuer-Str. 12, D-80807 Muenchen > Sitz der Zweigniederlassung: Muenchen > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, > USt-ID DE317013094 > > > >