Hi Arvid In fact, I think the way of combination is more flexible, especially in terms of reuse. Once you make changes, you can reuse the last logic well. For example, two AsyncIOA+AsyncIOB, you can easily reuse the previous logic AsyncIOA+AsyncIOC, For compatibility, I don't think there would be a big problem. For example, for the old AsyncIOA+AsyncIOB, we can always convert to AsyncIOA+AsyncIOB+AsyncIOC. When AsyncIOB does not have a state, simply pass it through.
Hi Steffen You are right, there are indeed such examples. However, I also mentioned in my previous email that we could extend the AsyncIO capability ---- XXXFuture.fail(Excelption) so that AsyncIO can re-try(put the failed element back to the internal queue again), which will also solve the problem you mentioned. BTW if we implement `AsyncSink` separately, there may also be Order/UnOrder issues, which is already resolved by the `AsyncIO` In general, I think AsyncIO and AsyncSink are very similar. There might be some duplicated work. Best, Guowei On Fri, Jul 30, 2021 at 8:16 PM Hausmann, Steffen <shau...@amazon.de.invalid> wrote: > Hey Guowei, > > there is one additional aspect I want to highlight that is relevant for > the types of destinations we had in mind when designing the AsyncSink. > > I'll again use Kinesis as an example, but the same argument applies to > other destinations. We are using the PutRecords API to persist up to 500 > events with a single API call to reduce the overhead compared to using > individual calls per event. But not all of the 500 events may be persisted > successfully, eg, a single event fails to be persisted due to server side > throttling. With the MailboxExecutor based implementation, we can just add > this event back to the internal queue. The event will then be retied with > the next batch of 500 events. > > In my understanding, that's not possible with the AsyncIO based approach. > During a retry, we can only retry the failed events of the original batch > of events, which means we would need to send a single event with a separate > PutRecords call. Depending how often that happens, this could add up. > > Does that make sense? > > Cheers, Steffen > > > On 30.07.21, 05:51, "Guowei Ma" <guowei....@gmail.com> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hi, Arvid & Piotr > Sorry for the late reply. > 1. Thank you all very much for your patience and explanation. > Recently, I > have also studied the related code of 'MailBox', which may not be as > serious as I thought, after all, it is very similar to Java's > `Executor`; > 2. Whether to use AsyncIO or MailBox to implement Kinesis connector is > more > up to the contributor to decide (after all, `Mailbox` has decided to be > exposed :-) ). It’s just that I personally prefer to combine some > simple > functions to complete a more advanced function. > Best, > Guowei > > > On Sat, Jul 24, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote: > > > Just to reiterate on Piotr's point: MailboxExecutor is pretty much an > > Executor [1] with named lambdas, except for the name MailboxExecutor > > nothing is hinting at a specific threading model. > > > > Currently, we expose it on StreamOperator API. Afaik the idea is to > make > > the StreamOperator internal and beef up ProcessFunction but for > several use > > cases (e.g., AsyncIO), we actually need to expose the executor > anyways. > > > > We could rename MailboxExecutor to avoid exposing the name of the > threading > > model. For example, we could rename it to TaskThreadExecutor (but > that's > > pretty much the same), to CooperativeExecutor (again implies > Mailbox), to > > o.a.f.Executor, to DeferredExecutor... Ideas are welcome. > > > > We could also simply use Java's Executor interface, however, when > working > > with that interface, I found that the missing context of async > executed > > lambdas made debugging much much harder. So that's why I designed > > MailboxExecutor to force the user to give some debug string to each > > invokation. In the sink context, we could, however, use an adaptor > from > > MailboxExecutor to Java's Executor and simply bind the sink name to > the > > invokations. > > > > [1] > > > > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html > > > > On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski <pnowoj...@apache.org > > > > wrote: > > > > > Hi, > > > > > > Regarding the question whether to expose the MailboxExecutor or > not: > > > 1. We have plans on exposing it in the ProcessFunction (in short > we want > > to > > > make StreamOperator API private/internal only, and move all of > it's extra > > > functionality in one way or another to the ProcessFunction). I > don't > > > remember and I'm not sure if *Dawid* had a different idea about > this (do > > > not expose Mailbox but wrap it somehow?) > > > 2. If we provide a thin wrapper around MailboxExecutor, I'm not > sure how > > > helpful it will be for keeping backward compatibility in the > future. > > > `MailboxExecutor` is already a very generic interface that doesn't > expose > > > much about the current threading model. Note that the previous > threading > > > model (multi threaded with checkpoint lock), should be easy to > implement > > > using the `MailboxExecutor` interface (use a direct executor that > > acquires > > > checkpoint lock). > > > > > > Having said that, I haven't spent too much time thinking about > whether > > it's > > > better to enrich AsyncIO or provide the AsyncSink. If we can just > as > > > efficiently provide the same functionality using the > existing/enhanced > > > AsyncIO API, that may be a good idea if it indeed reduces our > > > maintenance costs. > > > > > > Piotrek > > > > > > pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> > napisał(a): > > > > > > > Hi, Arvid > > > > > > > > >>>The main question here is what do you think is the harm of > exposing > > > > Mailbox? Is it the complexity or the maintenance overhead? > > > > > > > > I think that exposing the internal threading model might be > risky. In > > > case > > > > the threading model changes, it will affect the user's api and > bring > > the > > > > burden of internal modification. (Of course, you may have more > say in > > how > > > > the MailBox model will develop in the future) Therefore, I think > that > > if > > > an > > > > alternative solution can be found, these possible risks will be > > avoided: > > > > for example, through AsyncIO. > > > > > > > > >>>>AsyncIO has no user state, so we would be quite limited in > > > implementing > > > > at-least-once sinks especially when it comes to retries. > Chaining two > > > > AsyncIO would make it even harder to reason about the built-in > state. > > We > > > > would also give up any chance to implement exactly once async > sinks > > (even > > > > though I'm not sure if it's possible at all). > > > > > > > > 1. Why would we need to use the state when retrying(maybe I miss > > > > something)? If a batch of asynchronous requests fails, I think > it is > > > enough > > > > to retry directly in the callback. Or extend AsyncIO to give it > the > > > ability > > > > to retry(XXXFuture.fail (Excelption)); in addition, in general, > the > > > client > > > > has its own retry mechanism, at least the producer of Kineses > said in > > the > > > > document. Of course I am not opposed to retrying, I just want to > find a > > > > more obvious example to support the need to do so. > > > > > > > > 2. I don't think using AsyncIO will prevent exactly once in the > future. > > > > Both solutions need to be rewritten unless Exactly Once is > required > > from > > > > the beginning. > > > > > > > > >>>Users will have a hard time to discover SinkUtil.sinkTo > compared to > > > the > > > > expected stream.sinkTo. We have seen that on other occasions > already > > > (e.g., > > > > reinterpretAsKeyedStream). > > > > In fact, I think this is the most important issue. We lack the > function > > > of > > > > supporting sub-topology at the API layer, which is very > inconvenient. > > For > > > > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think? > > > > ```java > > > > AsyncSinkTopologyBuildrer { > > > > void build(inputstream) { > > > > input.flatmap().async()… > > > > } > > > > ``` > > > > In general, I want to know what principles will guide when to > expose > > more > > > > implementations to users, and when to combine with existing UDFs. > > > > > > > > Best, > > > > Guowei > > > > > > > > > > > > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org> > wrote: > > > > > > > > > Hi Guowei, > > > > > > > > > > I think the whole discussion here started as a means to avoid > > exposing > > > > > MailboxExecutor to the user. Your preferred way would be to > improve > > > > AsyncIO > > > > > to support batching or implement AsyncSink as > batching+AsyncIO. Here > > > are > > > > > some thoughts. > > > > > > > > > > 1) We should take a step back and note that we actually already > > expose > > > > > MailboxExecutor on the Operator API in particular to implement > > AsyncIO. > > > > > (Now, we could say that we actually want to hide Operator API > in the > > > > future > > > > > and could make MailboxExecutor internal again) > > > > > > > > > > 2) Exposing MailboxExecutor in general is a means for users to > access > > > the > > > > > task thread in a cooperative way. That would allow certain > > > communication > > > > > patterns beyond simply async requests. For example, the user > could > > > > > re-enqueue retries or even effectively block input processing > to > > induce > > > > > backpressure by enqueuing a waiting mail. So the main question > is if > > we > > > > > want to empower sink devs to access the task thread or not. > Note that > > > > > indirectly, sink devs have that option already through timers. > So in > > > > > theory, they could also enqueue a MIN_TIME timer and use that > to > > > > implement > > > > > any kind of async processing. > > > > > > > > > > The main question here is what do you think is the harm of > exposing > > > > > Mailbox? Is it the complexity or the maintenance overhead? > > > > > > > > > > 3) Simulating AsyncSink through AsyncIO is possible but has > some > > > > > downsides. > > > > > a) AsyncIO has no user state, so we would be quite limited in > > > > implementing > > > > > at-least-once sinks especially when it comes to retries. > Chaining two > > > > > AsyncIO would make it even harder to reason about the built-in > state. > > > We > > > > > would also give up any chance to implement exactly once async > sinks > > > (even > > > > > though I'm not sure if it's possible at all). > > > > > b) Users will have a hard time to discover SinkUtil.sinkTo > compared > > to > > > > the > > > > > expected stream.sinkTo. We have seen that on other occasions > already > > > > (e.g., > > > > > reinterpretAsKeyedStream). > > > > > c) AsyncIO is optimized to feed back the result into the main > task > > > > thread. > > > > > That's completely unneeded for sinks. > > > > > d) You probably know it better than me, but imho the batch > behavior > > of > > > a > > > > > proper sink would be much better than an AsyncIO simulation (I > have > > not > > > > > tracked the latest discussion in FLIP-147). > > > > > > > > > > Note that if we absolutely don't want to expose > MailboxExecutor, we > > can > > > > > also try to get it through some casts to internal interfaces. > So the > > > > > Sink.InitContext could have a Sink.InitContextInternal > subinterface > > > that > > > > > includes the mailbox executor. In AsyncSink, we could cast to > the > > > > internal > > > > > interface and are the only ones that can access the > MailboxExecutor > > > > > legimately (of course, connector devs could do the same cast > but then > > > use > > > > > an internal class and need to expect breaking changes). > > > > > > > > > > For your question: > > > > > > > > > >> 2. By the way, I have a little question. Why not directly > reduce the > > > > >> queue size to control the in-flight query, for example, > according to > > > > your > > > > >> example, > > > > >> Is a queue size such as 150 enough? In fact, there are caches > in > > many > > > > >> places in the entire topology, such as buffers on the network > stack. > > > > >> > > > > > It's a bit different. Let's take kinesis as an example. The > sink > > > collects > > > > > 500 elements and puts them in a single request (500 is max > number of > > > > > records per batch request). Now it sends the request out. At > the same > > > > time, > > > > > the next 500 elements are collected. So the in-flight query > size > > refers > > > > to > > > > > the number of parallel requets (500 elements each). > > > > > If we first batch 500*numberOfInFlightRequests, and then send > out all > > > > > numberOfInFlightRequests at the same time, we get worse > latency. > > > > > > > > > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma < > guowei....@gmail.com> > > > wrote: > > > > > > > > > >> Hi, Steffen > > > > >> > > > > >> Thank you for your detailed explanation. > > > > >> > > > > >> >>>But whether a sink is overloaded not only depends on the > queue > > > size. > > > > >> It also depends on the number of in-flight async requests > > > > >> > > > > >> 1. How about chaining two AsyncIOs? One is for controlling > the size > > of > > > > >> the buffer elements; The other is for controlling the > in-flight > > async > > > > >> requests. I think this way might do it and would not need to > expose > > > the > > > > >> MailboxExecutor. > > > > >> 2. By the way, I have a little question. Why not directly > reduce the > > > > >> queue size to control the in-flight query, for example, > according to > > > > your > > > > >> example, > > > > >> Is a queue size such as 150 enough? In fact, there are caches > in > > many > > > > >> places in the entire topology, such as buffers on the network > stack. > > > > >> > > > > >> >>> I’m not sure whether I’m understanding who you are > referring to > > by > > > > >> user. > > > > >> Personally I mean the sink developer. > > > > >> > > > > >> > > > > >> Best, > > > > >> Guowei > > > > >> > > > > >> > > > > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen > > > > >> <shau...@amazon.de.invalid> wrote: > > > > >> > > > > >>> Hey, > > > > >>> > > > > >>> We are using the `MailboxExecutor` to block calls to `write` > in > > case > > > > the > > > > >>> sink is somehow overloaded. Overloaded basically means that > the > > sink > > > > cannot > > > > >>> persist messages quickly enough into the respective > destination. > > > > >>> > > > > >>> But whether a sink is overloaded not only depends on the > queue > > size. > > > It > > > > >>> also depends on the number of in-flight async requests that > must > > not > > > > grow > > > > >>> too large either [1, 2]. We also need to support use cases > where > > the > > > > >>> destination can only ingest x messages per second or a total > > > > throughput of > > > > >>> y per second. We are also planning to support time outs so > that > > data > > > is > > > > >>> persisted into the destination at least every n seconds by > means of > > > the > > > > >>> `ProcessingTimeService`. The batching configuration will be > part of > > > the > > > > >>> constructor, which has only been indicated in the current > prototype > > > > but is > > > > >>> not implemented, yet [3]. > > > > >>> > > > > >>> I’m not sure whether I’m understanding who you are referring > to by > > > > user. > > > > >>> People who are using a concrete sink, eg, to send messages > into a > > > > Kinesis > > > > >>> stream, will not be exposed to the `MailboxExecutor`. They > are just > > > > using > > > > >>> the sink and pass in the batching configuration from above > [4]. The > > > > >>> `MailboxExecutor` and `ProcessingTimeService` are only > relevant for > > > > sink > > > > >>> authors who want to create support for a new destination. I > would > > > > expect > > > > >>> that there are only few experts who are adding support for > new > > > > >>> destinations, who are capable to understand and use the > advanced > > > > constructs > > > > >>> properly. > > > > >>> > > > > >>> Hope that helps to clarify our thinking. > > > > >>> > > > > >>> Cheers, Steffen > > > > >>> > > > > >>> > > > > >>> [1] > > > > >>> > > > > > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118 > > > > >>> [2] > > > > >>> > > > > > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155 > > > > >>> [3] > > > > >>> > > > > > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49 > > > > >>> [4] > > > > >>> > > > > > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45 > > > > >>> > > > > >>> > > > > >>> > > > > >>> From: Arvid Heise <ar...@apache.org> > > > > >>> Date: Tuesday, 20. July 2021 at 23:03 > > > > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann < > > shau...@amazon.de> > > > > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API > > > > >>> > > > > >>> > > > > >>> CAUTION: This email originated from outside of the > organization. Do > > > not > > > > >>> click links or open attachments unless you can confirm the > sender > > and > > > > know > > > > >>> the content is safe. > > > > >>> > > > > >>> > > > > >>> Hi Guowei, > > > > >>> > > > > >>> 1. your idea is quite similar to FLIP-171 [1]. The question > is if > > we > > > > >>> implement FLIP-171 based on public interfaces (which would > require > > > > exposing > > > > >>> MailboxExecutor as described here in FLIP-177) or if it's > better to > > > > >>> implement it internally and hide it. > > > > >>> The first option is an abstract base class; your second > option > > would > > > be > > > > >>> an abstract interface that has matching implementation > internally > > > > >>> (similarly to AsyncIO). > > > > >>> There is an example for option 1 in [2]; I think the idea > was to > > > > >>> additionally specify the batch size and batch timeout in the > ctor. > > > > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more. > > > > >>> > > > > >>> 2. I guess your question is if current AsyncIO is not > sufficient > > > > already > > > > >>> if exactly-once is not needed? The answer is currently no, > because > > > > AsyncIO > > > > >>> is not doing any batching. The user could do batching before > that > > but > > > > >>> that's quite a bit of code. However, we should really think > if > > > AsyncIO > > > > >>> should also support batching. > > > > >>> I would also say that the scope of AsyncIO and AsyncSink is > quite > > > > >>> different: the first one is for application developers and > the > > second > > > > one > > > > >>> is for connector developers and would be deemed an > implementation > > > > detail by > > > > >>> the application developer. Of course, advanced users may > fall in > > both > > > > >>> categories, so the distinction does not always hold. > > > > >>> > > > > >>> Nevertheless, there is some overlap between both approaches > and > > it's > > > > >>> important to think if the added complexity warrants the > benefit. It > > > > would > > > > >>> be interesting to hear how other devs see that. > > > > >>> > > > > >>> [1] > > > > >>> > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > > >>> [2] > > > > >>> > > > > > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java > > > > >>> > > > > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma < > guowei....@gmail.com > > > > <mailto: > > > > >>> guowei....@gmail.com>> wrote: > > > > >>> Hi, Avrid > > > > >>> Thank you Avrid for perfecting Sink through this FLIP. I > have two > > > > little > > > > >>> questions > > > > >>> > > > > >>> 1. What do you think of us directly providing an interface as > > > follows? > > > > In > > > > >>> this way, there may be no need to expose the Mailbox to the > user. > > We > > > > can > > > > >>> implement an `AsyncSinkWriterOperator` to control the length > of the > > > > >>> queue. > > > > >>> If it is too long, do not call SinkWriter::write. > > > > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT> > > > > >>> extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, > CommT, > > > > >>> WriterStateT> { //// please ignore the name of Tuple2 and > XXXFuture > > > at > > > > >>> first. > > > > >>> int getQueueLimit(); > > > > >>> } > > > > >>> > > > > >>> 2. Another question is: If users don't care about exactly > once and > > > the > > > > >>> unification of stream and batch, how about letting users use > > > > >>> `AsyncFunction` directly? I don’t have an answer either. I > want to > > > hear > > > > >>> your suggestions. > > > > >>> > > > > >>> Best, > > > > >>> Guowei > > > > >>> > > > > >>> > > > > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise < > ar...@apache.org > > > <mailto: > > > > >>> ar...@apache.org>> wrote: > > > > >>> > > > > >>> > Dear devs, > > > > >>> > > > > > >>> > today I'd like to start the discussion on the Sink API. I > have > > > > drafted > > > > >>> a > > > > >>> > FLIP [1] with an accompanying PR [2]. > > > > >>> > > > > > >>> > This FLIP is a bit special as it's actually a few smaller > > > Amend-FLIPs > > > > >>> in > > > > >>> > one. In this discussion, we should decide on the scope and > cut > > out > > > > too > > > > >>> > invasive steps if we can't reach an agreement. > > > > >>> > > > > > >>> > Step 1 is to add a few more pieces of information to > context > > > objects. > > > > >>> > That's non-breaking and needed for the async communication > > pattern > > > in > > > > >>> > FLIP-171 [3]. While we need to add a new Public API > > > > (MailboxExecutor), > > > > >>> I > > > > >>> > think that this should entail the least discussions. > > > > >>> > > > > > >>> > Step 2 is to also offer the same context information to > > committers. > > > > >>> Here we > > > > >>> > can offer some compatibility methods to not break existing > sinks. > > > The > > > > >>> main > > > > >>> > use case would be some async exactly-once sink but I'm not > sure > > if > > > we > > > > >>> would > > > > >>> > use async communication patterns at all here (or simply > wait for > > > all > > > > >>> async > > > > >>> > requests to finish in a sync way). It may also help with > async > > > > cleanup > > > > >>> > tasks though. > > > > >>> > > > > > >>> > While drafting Step 2, I noticed the big entanglement of > the > > > current > > > > >>> API. > > > > >>> > To figure out if there is a committer during the stream > graph > > > > >>> creation, we > > > > >>> > actually need to create a committer which can have > unforeseen > > > > >>> consequences. > > > > >>> > Thus, I spiked if we can disentangle the interface and have > > > separate > > > > >>> > interfaces for the different use cases. The resulting step > 3 > > would > > > > be a > > > > >>> > completely breaking change and thus is probably > controversial. > > > > >>> However, I'd > > > > >>> > also see the disentanglement as a way to prepare to make > Sinks > > more > > > > >>> > expressive (write and commit coordinator) without > completely > > > > >>> overloading > > > > >>> > the main interface. > > > > >>> > > > > > >>> > [1] > > > > >>> > > > > > >>> > > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > > > > >>> > [2] https://github.com/apache/flink/pull/16399 > > > > >>> > [3] > > > > >>> > > > > > >>> > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > > >>> > > > > > >>> > > > > >>> > > > > >>> > > > > >>> Amazon Web Services EMEA SARL > > > > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg > > > > >>> Sitz der Gesellschaft: L-1855 Luxemburg > > > > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. > B186284 > > > > >>> > > > > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland > > > > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen > > > > >>> Sitz der Zweigniederlassung: Muenchen > > > > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen > unter HRB > > > > >>> 242240, USt-ID DE317013094 > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > > > > > > > > > > > Amazon Web Services EMEA SARL > 38 avenue John F. Kennedy, L-1855 Luxembourg > Sitz der Gesellschaft: L-1855 Luxemburg > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > Marcel-Breuer-Str. 12, D-80807 Muenchen > Sitz der Zweigniederlassung: Muenchen > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, > USt-ID DE317013094 > > > >