Hi, Arvid & Piotr Sorry for the late reply. 1. Thank you all very much for your patience and explanation. Recently, I have also studied the related code of 'MailBox', which may not be as serious as I thought, after all, it is very similar to Java's `Executor`; 2. Whether to use AsyncIO or MailBox to implement Kinesis connector is more up to the contributor to decide (after all, `Mailbox` has decided to be exposed :-) ). It’s just that I personally prefer to combine some simple functions to complete a more advanced function. Best, Guowei
On Sat, Jul 24, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote: > Just to reiterate on Piotr's point: MailboxExecutor is pretty much an > Executor [1] with named lambdas, except for the name MailboxExecutor > nothing is hinting at a specific threading model. > > Currently, we expose it on StreamOperator API. Afaik the idea is to make > the StreamOperator internal and beef up ProcessFunction but for several use > cases (e.g., AsyncIO), we actually need to expose the executor anyways. > > We could rename MailboxExecutor to avoid exposing the name of the threading > model. For example, we could rename it to TaskThreadExecutor (but that's > pretty much the same), to CooperativeExecutor (again implies Mailbox), to > o.a.f.Executor, to DeferredExecutor... Ideas are welcome. > > We could also simply use Java's Executor interface, however, when working > with that interface, I found that the missing context of async executed > lambdas made debugging much much harder. So that's why I designed > MailboxExecutor to force the user to give some debug string to each > invokation. In the sink context, we could, however, use an adaptor from > MailboxExecutor to Java's Executor and simply bind the sink name to the > invokations. > > [1] > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html > > On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > Hi, > > > > Regarding the question whether to expose the MailboxExecutor or not: > > 1. We have plans on exposing it in the ProcessFunction (in short we want > to > > make StreamOperator API private/internal only, and move all of it's extra > > functionality in one way or another to the ProcessFunction). I don't > > remember and I'm not sure if *Dawid* had a different idea about this (do > > not expose Mailbox but wrap it somehow?) > > 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure how > > helpful it will be for keeping backward compatibility in the future. > > `MailboxExecutor` is already a very generic interface that doesn't expose > > much about the current threading model. Note that the previous threading > > model (multi threaded with checkpoint lock), should be easy to implement > > using the `MailboxExecutor` interface (use a direct executor that > acquires > > checkpoint lock). > > > > Having said that, I haven't spent too much time thinking about whether > it's > > better to enrich AsyncIO or provide the AsyncSink. If we can just as > > efficiently provide the same functionality using the existing/enhanced > > AsyncIO API, that may be a good idea if it indeed reduces our > > maintenance costs. > > > > Piotrek > > > > pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> napisał(a): > > > > > Hi, Arvid > > > > > > >>>The main question here is what do you think is the harm of exposing > > > Mailbox? Is it the complexity or the maintenance overhead? > > > > > > I think that exposing the internal threading model might be risky. In > > case > > > the threading model changes, it will affect the user's api and bring > the > > > burden of internal modification. (Of course, you may have more say in > how > > > the MailBox model will develop in the future) Therefore, I think that > if > > an > > > alternative solution can be found, these possible risks will be > avoided: > > > for example, through AsyncIO. > > > > > > >>>>AsyncIO has no user state, so we would be quite limited in > > implementing > > > at-least-once sinks especially when it comes to retries. Chaining two > > > AsyncIO would make it even harder to reason about the built-in state. > We > > > would also give up any chance to implement exactly once async sinks > (even > > > though I'm not sure if it's possible at all). > > > > > > 1. Why would we need to use the state when retrying(maybe I miss > > > something)? If a batch of asynchronous requests fails, I think it is > > enough > > > to retry directly in the callback. Or extend AsyncIO to give it the > > ability > > > to retry(XXXFuture.fail (Excelption)); in addition, in general, the > > client > > > has its own retry mechanism, at least the producer of Kineses said in > the > > > document. Of course I am not opposed to retrying, I just want to find a > > > more obvious example to support the need to do so. > > > > > > 2. I don't think using AsyncIO will prevent exactly once in the future. > > > Both solutions need to be rewritten unless Exactly Once is required > from > > > the beginning. > > > > > > >>>Users will have a hard time to discover SinkUtil.sinkTo compared to > > the > > > expected stream.sinkTo. We have seen that on other occasions already > > (e.g., > > > reinterpretAsKeyedStream). > > > In fact, I think this is the most important issue. We lack the function > > of > > > supporting sub-topology at the API layer, which is very inconvenient. > For > > > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think? > > > ```java > > > AsyncSinkTopologyBuildrer { > > > void build(inputstream) { > > > input.flatmap().async()… > > > } > > > ``` > > > In general, I want to know what principles will guide when to expose > more > > > implementations to users, and when to combine with existing UDFs. > > > > > > Best, > > > Guowei > > > > > > > > > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org> wrote: > > > > > > > Hi Guowei, > > > > > > > > I think the whole discussion here started as a means to avoid > exposing > > > > MailboxExecutor to the user. Your preferred way would be to improve > > > AsyncIO > > > > to support batching or implement AsyncSink as batching+AsyncIO. Here > > are > > > > some thoughts. > > > > > > > > 1) We should take a step back and note that we actually already > expose > > > > MailboxExecutor on the Operator API in particular to implement > AsyncIO. > > > > (Now, we could say that we actually want to hide Operator API in the > > > future > > > > and could make MailboxExecutor internal again) > > > > > > > > 2) Exposing MailboxExecutor in general is a means for users to access > > the > > > > task thread in a cooperative way. That would allow certain > > communication > > > > patterns beyond simply async requests. For example, the user could > > > > re-enqueue retries or even effectively block input processing to > induce > > > > backpressure by enqueuing a waiting mail. So the main question is if > we > > > > want to empower sink devs to access the task thread or not. Note that > > > > indirectly, sink devs have that option already through timers. So in > > > > theory, they could also enqueue a MIN_TIME timer and use that to > > > implement > > > > any kind of async processing. > > > > > > > > The main question here is what do you think is the harm of exposing > > > > Mailbox? Is it the complexity or the maintenance overhead? > > > > > > > > 3) Simulating AsyncSink through AsyncIO is possible but has some > > > > downsides. > > > > a) AsyncIO has no user state, so we would be quite limited in > > > implementing > > > > at-least-once sinks especially when it comes to retries. Chaining two > > > > AsyncIO would make it even harder to reason about the built-in state. > > We > > > > would also give up any chance to implement exactly once async sinks > > (even > > > > though I'm not sure if it's possible at all). > > > > b) Users will have a hard time to discover SinkUtil.sinkTo compared > to > > > the > > > > expected stream.sinkTo. We have seen that on other occasions already > > > (e.g., > > > > reinterpretAsKeyedStream). > > > > c) AsyncIO is optimized to feed back the result into the main task > > > thread. > > > > That's completely unneeded for sinks. > > > > d) You probably know it better than me, but imho the batch behavior > of > > a > > > > proper sink would be much better than an AsyncIO simulation (I have > not > > > > tracked the latest discussion in FLIP-147). > > > > > > > > Note that if we absolutely don't want to expose MailboxExecutor, we > can > > > > also try to get it through some casts to internal interfaces. So the > > > > Sink.InitContext could have a Sink.InitContextInternal subinterface > > that > > > > includes the mailbox executor. In AsyncSink, we could cast to the > > > internal > > > > interface and are the only ones that can access the MailboxExecutor > > > > legimately (of course, connector devs could do the same cast but then > > use > > > > an internal class and need to expect breaking changes). > > > > > > > > For your question: > > > > > > > >> 2. By the way, I have a little question. Why not directly reduce the > > > >> queue size to control the in-flight query, for example, according to > > > your > > > >> example, > > > >> Is a queue size such as 150 enough? In fact, there are caches in > many > > > >> places in the entire topology, such as buffers on the network stack. > > > >> > > > > It's a bit different. Let's take kinesis as an example. The sink > > collects > > > > 500 elements and puts them in a single request (500 is max number of > > > > records per batch request). Now it sends the request out. At the same > > > time, > > > > the next 500 elements are collected. So the in-flight query size > refers > > > to > > > > the number of parallel requets (500 elements each). > > > > If we first batch 500*numberOfInFlightRequests, and then send out all > > > > numberOfInFlightRequests at the same time, we get worse latency. > > > > > > > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com> > > wrote: > > > > > > > >> Hi, Steffen > > > >> > > > >> Thank you for your detailed explanation. > > > >> > > > >> >>>But whether a sink is overloaded not only depends on the queue > > size. > > > >> It also depends on the number of in-flight async requests > > > >> > > > >> 1. How about chaining two AsyncIOs? One is for controlling the size > of > > > >> the buffer elements; The other is for controlling the in-flight > async > > > >> requests. I think this way might do it and would not need to expose > > the > > > >> MailboxExecutor. > > > >> 2. By the way, I have a little question. Why not directly reduce the > > > >> queue size to control the in-flight query, for example, according to > > > your > > > >> example, > > > >> Is a queue size such as 150 enough? In fact, there are caches in > many > > > >> places in the entire topology, such as buffers on the network stack. > > > >> > > > >> >>> I’m not sure whether I’m understanding who you are referring to > by > > > >> user. > > > >> Personally I mean the sink developer. > > > >> > > > >> > > > >> Best, > > > >> Guowei > > > >> > > > >> > > > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen > > > >> <shau...@amazon.de.invalid> wrote: > > > >> > > > >>> Hey, > > > >>> > > > >>> We are using the `MailboxExecutor` to block calls to `write` in > case > > > the > > > >>> sink is somehow overloaded. Overloaded basically means that the > sink > > > cannot > > > >>> persist messages quickly enough into the respective destination. > > > >>> > > > >>> But whether a sink is overloaded not only depends on the queue > size. > > It > > > >>> also depends on the number of in-flight async requests that must > not > > > grow > > > >>> too large either [1, 2]. We also need to support use cases where > the > > > >>> destination can only ingest x messages per second or a total > > > throughput of > > > >>> y per second. We are also planning to support time outs so that > data > > is > > > >>> persisted into the destination at least every n seconds by means of > > the > > > >>> `ProcessingTimeService`. The batching configuration will be part of > > the > > > >>> constructor, which has only been indicated in the current prototype > > > but is > > > >>> not implemented, yet [3]. > > > >>> > > > >>> I’m not sure whether I’m understanding who you are referring to by > > > user. > > > >>> People who are using a concrete sink, eg, to send messages into a > > > Kinesis > > > >>> stream, will not be exposed to the `MailboxExecutor`. They are just > > > using > > > >>> the sink and pass in the batching configuration from above [4]. The > > > >>> `MailboxExecutor` and `ProcessingTimeService` are only relevant for > > > sink > > > >>> authors who want to create support for a new destination. I would > > > expect > > > >>> that there are only few experts who are adding support for new > > > >>> destinations, who are capable to understand and use the advanced > > > constructs > > > >>> properly. > > > >>> > > > >>> Hope that helps to clarify our thinking. > > > >>> > > > >>> Cheers, Steffen > > > >>> > > > >>> > > > >>> [1] > > > >>> > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118 > > > >>> [2] > > > >>> > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155 > > > >>> [3] > > > >>> > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49 > > > >>> [4] > > > >>> > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45 > > > >>> > > > >>> > > > >>> > > > >>> From: Arvid Heise <ar...@apache.org> > > > >>> Date: Tuesday, 20. July 2021 at 23:03 > > > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann < > shau...@amazon.de> > > > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API > > > >>> > > > >>> > > > >>> CAUTION: This email originated from outside of the organization. Do > > not > > > >>> click links or open attachments unless you can confirm the sender > and > > > know > > > >>> the content is safe. > > > >>> > > > >>> > > > >>> Hi Guowei, > > > >>> > > > >>> 1. your idea is quite similar to FLIP-171 [1]. The question is if > we > > > >>> implement FLIP-171 based on public interfaces (which would require > > > exposing > > > >>> MailboxExecutor as described here in FLIP-177) or if it's better to > > > >>> implement it internally and hide it. > > > >>> The first option is an abstract base class; your second option > would > > be > > > >>> an abstract interface that has matching implementation internally > > > >>> (similarly to AsyncIO). > > > >>> There is an example for option 1 in [2]; I think the idea was to > > > >>> additionally specify the batch size and batch timeout in the ctor. > > > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more. > > > >>> > > > >>> 2. I guess your question is if current AsyncIO is not sufficient > > > already > > > >>> if exactly-once is not needed? The answer is currently no, because > > > AsyncIO > > > >>> is not doing any batching. The user could do batching before that > but > > > >>> that's quite a bit of code. However, we should really think if > > AsyncIO > > > >>> should also support batching. > > > >>> I would also say that the scope of AsyncIO and AsyncSink is quite > > > >>> different: the first one is for application developers and the > second > > > one > > > >>> is for connector developers and would be deemed an implementation > > > detail by > > > >>> the application developer. Of course, advanced users may fall in > both > > > >>> categories, so the distinction does not always hold. > > > >>> > > > >>> Nevertheless, there is some overlap between both approaches and > it's > > > >>> important to think if the added complexity warrants the benefit. It > > > would > > > >>> be interesting to hear how other devs see that. > > > >>> > > > >>> [1] > > > >>> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > >>> [2] > > > >>> > > > > > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java > > > >>> > > > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com > > > <mailto: > > > >>> guowei....@gmail.com>> wrote: > > > >>> Hi, Avrid > > > >>> Thank you Avrid for perfecting Sink through this FLIP. I have two > > > little > > > >>> questions > > > >>> > > > >>> 1. What do you think of us directly providing an interface as > > follows? > > > In > > > >>> this way, there may be no need to expose the Mailbox to the user. > We > > > can > > > >>> implement an `AsyncSinkWriterOperator` to control the length of the > > > >>> queue. > > > >>> If it is too long, do not call SinkWriter::write. > > > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT> > > > >>> extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT, > > > >>> WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture > > at > > > >>> first. > > > >>> int getQueueLimit(); > > > >>> } > > > >>> > > > >>> 2. Another question is: If users don't care about exactly once and > > the > > > >>> unification of stream and batch, how about letting users use > > > >>> `AsyncFunction` directly? I don’t have an answer either. I want to > > hear > > > >>> your suggestions. > > > >>> > > > >>> Best, > > > >>> Guowei > > > >>> > > > >>> > > > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org > > <mailto: > > > >>> ar...@apache.org>> wrote: > > > >>> > > > >>> > Dear devs, > > > >>> > > > > >>> > today I'd like to start the discussion on the Sink API. I have > > > drafted > > > >>> a > > > >>> > FLIP [1] with an accompanying PR [2]. > > > >>> > > > > >>> > This FLIP is a bit special as it's actually a few smaller > > Amend-FLIPs > > > >>> in > > > >>> > one. In this discussion, we should decide on the scope and cut > out > > > too > > > >>> > invasive steps if we can't reach an agreement. > > > >>> > > > > >>> > Step 1 is to add a few more pieces of information to context > > objects. > > > >>> > That's non-breaking and needed for the async communication > pattern > > in > > > >>> > FLIP-171 [3]. While we need to add a new Public API > > > (MailboxExecutor), > > > >>> I > > > >>> > think that this should entail the least discussions. > > > >>> > > > > >>> > Step 2 is to also offer the same context information to > committers. > > > >>> Here we > > > >>> > can offer some compatibility methods to not break existing sinks. > > The > > > >>> main > > > >>> > use case would be some async exactly-once sink but I'm not sure > if > > we > > > >>> would > > > >>> > use async communication patterns at all here (or simply wait for > > all > > > >>> async > > > >>> > requests to finish in a sync way). It may also help with async > > > cleanup > > > >>> > tasks though. > > > >>> > > > > >>> > While drafting Step 2, I noticed the big entanglement of the > > current > > > >>> API. > > > >>> > To figure out if there is a committer during the stream graph > > > >>> creation, we > > > >>> > actually need to create a committer which can have unforeseen > > > >>> consequences. > > > >>> > Thus, I spiked if we can disentangle the interface and have > > separate > > > >>> > interfaces for the different use cases. The resulting step 3 > would > > > be a > > > >>> > completely breaking change and thus is probably controversial. > > > >>> However, I'd > > > >>> > also see the disentanglement as a way to prepare to make Sinks > more > > > >>> > expressive (write and commit coordinator) without completely > > > >>> overloading > > > >>> > the main interface. > > > >>> > > > > >>> > [1] > > > >>> > > > > >>> > > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > > > >>> > [2] https://github.com/apache/flink/pull/16399 > > > >>> > [3] > > > >>> > > > > >>> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > >>> > > > > >>> > > > >>> > > > >>> > > > >>> Amazon Web Services EMEA SARL > > > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg > > > >>> Sitz der Gesellschaft: L-1855 Luxemburg > > > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > > >>> > > > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland > > > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen > > > >>> Sitz der Zweigniederlassung: Muenchen > > > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB > > > >>> 242240, USt-ID DE317013094 > > > >>> > > > >>> > > > >>> > > > >>> > > > > > >