Hi, Regarding the question whether to expose the MailboxExecutor or not: 1. We have plans on exposing it in the ProcessFunction (in short we want to make StreamOperator API private/internal only, and move all of it's extra functionality in one way or another to the ProcessFunction). I don't remember and I'm not sure if *Dawid* had a different idea about this (do not expose Mailbox but wrap it somehow?) 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure how helpful it will be for keeping backward compatibility in the future. `MailboxExecutor` is already a very generic interface that doesn't expose much about the current threading model. Note that the previous threading model (multi threaded with checkpoint lock), should be easy to implement using the `MailboxExecutor` interface (use a direct executor that acquires checkpoint lock).
Having said that, I haven't spent too much time thinking about whether it's better to enrich AsyncIO or provide the AsyncSink. If we can just as efficiently provide the same functionality using the existing/enhanced AsyncIO API, that may be a good idea if it indeed reduces our maintenance costs. Piotrek pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> napisał(a): > Hi, Arvid > > >>>The main question here is what do you think is the harm of exposing > Mailbox? Is it the complexity or the maintenance overhead? > > I think that exposing the internal threading model might be risky. In case > the threading model changes, it will affect the user's api and bring the > burden of internal modification. (Of course, you may have more say in how > the MailBox model will develop in the future) Therefore, I think that if an > alternative solution can be found, these possible risks will be avoided: > for example, through AsyncIO. > > >>>>AsyncIO has no user state, so we would be quite limited in implementing > at-least-once sinks especially when it comes to retries. Chaining two > AsyncIO would make it even harder to reason about the built-in state. We > would also give up any chance to implement exactly once async sinks (even > though I'm not sure if it's possible at all). > > 1. Why would we need to use the state when retrying(maybe I miss > something)? If a batch of asynchronous requests fails, I think it is enough > to retry directly in the callback. Or extend AsyncIO to give it the ability > to retry(XXXFuture.fail (Excelption)); in addition, in general, the client > has its own retry mechanism, at least the producer of Kineses said in the > document. Of course I am not opposed to retrying, I just want to find a > more obvious example to support the need to do so. > > 2. I don't think using AsyncIO will prevent exactly once in the future. > Both solutions need to be rewritten unless Exactly Once is required from > the beginning. > > >>>Users will have a hard time to discover SinkUtil.sinkTo compared to the > expected stream.sinkTo. We have seen that on other occasions already (e.g., > reinterpretAsKeyedStream). > In fact, I think this is the most important issue. We lack the function of > supporting sub-topology at the API layer, which is very inconvenient. For > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think? > ```java > AsyncSinkTopologyBuildrer { > void build(inputstream) { > input.flatmap().async()… > } > ``` > In general, I want to know what principles will guide when to expose more > implementations to users, and when to combine with existing UDFs. > > Best, > Guowei > > > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org> wrote: > > > Hi Guowei, > > > > I think the whole discussion here started as a means to avoid exposing > > MailboxExecutor to the user. Your preferred way would be to improve > AsyncIO > > to support batching or implement AsyncSink as batching+AsyncIO. Here are > > some thoughts. > > > > 1) We should take a step back and note that we actually already expose > > MailboxExecutor on the Operator API in particular to implement AsyncIO. > > (Now, we could say that we actually want to hide Operator API in the > future > > and could make MailboxExecutor internal again) > > > > 2) Exposing MailboxExecutor in general is a means for users to access the > > task thread in a cooperative way. That would allow certain communication > > patterns beyond simply async requests. For example, the user could > > re-enqueue retries or even effectively block input processing to induce > > backpressure by enqueuing a waiting mail. So the main question is if we > > want to empower sink devs to access the task thread or not. Note that > > indirectly, sink devs have that option already through timers. So in > > theory, they could also enqueue a MIN_TIME timer and use that to > implement > > any kind of async processing. > > > > The main question here is what do you think is the harm of exposing > > Mailbox? Is it the complexity or the maintenance overhead? > > > > 3) Simulating AsyncSink through AsyncIO is possible but has some > > downsides. > > a) AsyncIO has no user state, so we would be quite limited in > implementing > > at-least-once sinks especially when it comes to retries. Chaining two > > AsyncIO would make it even harder to reason about the built-in state. We > > would also give up any chance to implement exactly once async sinks (even > > though I'm not sure if it's possible at all). > > b) Users will have a hard time to discover SinkUtil.sinkTo compared to > the > > expected stream.sinkTo. We have seen that on other occasions already > (e.g., > > reinterpretAsKeyedStream). > > c) AsyncIO is optimized to feed back the result into the main task > thread. > > That's completely unneeded for sinks. > > d) You probably know it better than me, but imho the batch behavior of a > > proper sink would be much better than an AsyncIO simulation (I have not > > tracked the latest discussion in FLIP-147). > > > > Note that if we absolutely don't want to expose MailboxExecutor, we can > > also try to get it through some casts to internal interfaces. So the > > Sink.InitContext could have a Sink.InitContextInternal subinterface that > > includes the mailbox executor. In AsyncSink, we could cast to the > internal > > interface and are the only ones that can access the MailboxExecutor > > legimately (of course, connector devs could do the same cast but then use > > an internal class and need to expect breaking changes). > > > > For your question: > > > >> 2. By the way, I have a little question. Why not directly reduce the > >> queue size to control the in-flight query, for example, according to > your > >> example, > >> Is a queue size such as 150 enough? In fact, there are caches in many > >> places in the entire topology, such as buffers on the network stack. > >> > > It's a bit different. Let's take kinesis as an example. The sink collects > > 500 elements and puts them in a single request (500 is max number of > > records per batch request). Now it sends the request out. At the same > time, > > the next 500 elements are collected. So the in-flight query size refers > to > > the number of parallel requets (500 elements each). > > If we first batch 500*numberOfInFlightRequests, and then send out all > > numberOfInFlightRequests at the same time, we get worse latency. > > > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com> wrote: > > > >> Hi, Steffen > >> > >> Thank you for your detailed explanation. > >> > >> >>>But whether a sink is overloaded not only depends on the queue size. > >> It also depends on the number of in-flight async requests > >> > >> 1. How about chaining two AsyncIOs? One is for controlling the size of > >> the buffer elements; The other is for controlling the in-flight async > >> requests. I think this way might do it and would not need to expose the > >> MailboxExecutor. > >> 2. By the way, I have a little question. Why not directly reduce the > >> queue size to control the in-flight query, for example, according to > your > >> example, > >> Is a queue size such as 150 enough? In fact, there are caches in many > >> places in the entire topology, such as buffers on the network stack. > >> > >> >>> I’m not sure whether I’m understanding who you are referring to by > >> user. > >> Personally I mean the sink developer. > >> > >> > >> Best, > >> Guowei > >> > >> > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen > >> <shau...@amazon.de.invalid> wrote: > >> > >>> Hey, > >>> > >>> We are using the `MailboxExecutor` to block calls to `write` in case > the > >>> sink is somehow overloaded. Overloaded basically means that the sink > cannot > >>> persist messages quickly enough into the respective destination. > >>> > >>> But whether a sink is overloaded not only depends on the queue size. It > >>> also depends on the number of in-flight async requests that must not > grow > >>> too large either [1, 2]. We also need to support use cases where the > >>> destination can only ingest x messages per second or a total > throughput of > >>> y per second. We are also planning to support time outs so that data is > >>> persisted into the destination at least every n seconds by means of the > >>> `ProcessingTimeService`. The batching configuration will be part of the > >>> constructor, which has only been indicated in the current prototype > but is > >>> not implemented, yet [3]. > >>> > >>> I’m not sure whether I’m understanding who you are referring to by > user. > >>> People who are using a concrete sink, eg, to send messages into a > Kinesis > >>> stream, will not be exposed to the `MailboxExecutor`. They are just > using > >>> the sink and pass in the batching configuration from above [4]. The > >>> `MailboxExecutor` and `ProcessingTimeService` are only relevant for > sink > >>> authors who want to create support for a new destination. I would > expect > >>> that there are only few experts who are adding support for new > >>> destinations, who are capable to understand and use the advanced > constructs > >>> properly. > >>> > >>> Hope that helps to clarify our thinking. > >>> > >>> Cheers, Steffen > >>> > >>> > >>> [1] > >>> > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118 > >>> [2] > >>> > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155 > >>> [3] > >>> > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49 > >>> [4] > >>> > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45 > >>> > >>> > >>> > >>> From: Arvid Heise <ar...@apache.org> > >>> Date: Tuesday, 20. July 2021 at 23:03 > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de> > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API > >>> > >>> > >>> CAUTION: This email originated from outside of the organization. Do not > >>> click links or open attachments unless you can confirm the sender and > know > >>> the content is safe. > >>> > >>> > >>> Hi Guowei, > >>> > >>> 1. your idea is quite similar to FLIP-171 [1]. The question is if we > >>> implement FLIP-171 based on public interfaces (which would require > exposing > >>> MailboxExecutor as described here in FLIP-177) or if it's better to > >>> implement it internally and hide it. > >>> The first option is an abstract base class; your second option would be > >>> an abstract interface that has matching implementation internally > >>> (similarly to AsyncIO). > >>> There is an example for option 1 in [2]; I think the idea was to > >>> additionally specify the batch size and batch timeout in the ctor. > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more. > >>> > >>> 2. I guess your question is if current AsyncIO is not sufficient > already > >>> if exactly-once is not needed? The answer is currently no, because > AsyncIO > >>> is not doing any batching. The user could do batching before that but > >>> that's quite a bit of code. However, we should really think if AsyncIO > >>> should also support batching. > >>> I would also say that the scope of AsyncIO and AsyncSink is quite > >>> different: the first one is for application developers and the second > one > >>> is for connector developers and would be deemed an implementation > detail by > >>> the application developer. Of course, advanced users may fall in both > >>> categories, so the distinction does not always hold. > >>> > >>> Nevertheless, there is some overlap between both approaches and it's > >>> important to think if the added complexity warrants the benefit. It > would > >>> be interesting to hear how other devs see that. > >>> > >>> [1] > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > >>> [2] > >>> > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java > >>> > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com > <mailto: > >>> guowei....@gmail.com>> wrote: > >>> Hi, Avrid > >>> Thank you Avrid for perfecting Sink through this FLIP. I have two > little > >>> questions > >>> > >>> 1. What do you think of us directly providing an interface as follows? > In > >>> this way, there may be no need to expose the Mailbox to the user. We > can > >>> implement an `AsyncSinkWriterOperator` to control the length of the > >>> queue. > >>> If it is too long, do not call SinkWriter::write. > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT> > >>> extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT, > >>> WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at > >>> first. > >>> int getQueueLimit(); > >>> } > >>> > >>> 2. Another question is: If users don't care about exactly once and the > >>> unification of stream and batch, how about letting users use > >>> `AsyncFunction` directly? I don’t have an answer either. I want to hear > >>> your suggestions. > >>> > >>> Best, > >>> Guowei > >>> > >>> > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org<mailto: > >>> ar...@apache.org>> wrote: > >>> > >>> > Dear devs, > >>> > > >>> > today I'd like to start the discussion on the Sink API. I have > drafted > >>> a > >>> > FLIP [1] with an accompanying PR [2]. > >>> > > >>> > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs > >>> in > >>> > one. In this discussion, we should decide on the scope and cut out > too > >>> > invasive steps if we can't reach an agreement. > >>> > > >>> > Step 1 is to add a few more pieces of information to context objects. > >>> > That's non-breaking and needed for the async communication pattern in > >>> > FLIP-171 [3]. While we need to add a new Public API > (MailboxExecutor), > >>> I > >>> > think that this should entail the least discussions. > >>> > > >>> > Step 2 is to also offer the same context information to committers. > >>> Here we > >>> > can offer some compatibility methods to not break existing sinks. The > >>> main > >>> > use case would be some async exactly-once sink but I'm not sure if we > >>> would > >>> > use async communication patterns at all here (or simply wait for all > >>> async > >>> > requests to finish in a sync way). It may also help with async > cleanup > >>> > tasks though. > >>> > > >>> > While drafting Step 2, I noticed the big entanglement of the current > >>> API. > >>> > To figure out if there is a committer during the stream graph > >>> creation, we > >>> > actually need to create a committer which can have unforeseen > >>> consequences. > >>> > Thus, I spiked if we can disentangle the interface and have separate > >>> > interfaces for the different use cases. The resulting step 3 would > be a > >>> > completely breaking change and thus is probably controversial. > >>> However, I'd > >>> > also see the disentanglement as a way to prepare to make Sinks more > >>> > expressive (write and commit coordinator) without completely > >>> overloading > >>> > the main interface. > >>> > > >>> > [1] > >>> > > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > >>> > [2] https://github.com/apache/flink/pull/16399 > >>> > [3] > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > >>> > > >>> > >>> > >>> > >>> Amazon Web Services EMEA SARL > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg > >>> Sitz der Gesellschaft: L-1855 Luxemburg > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > >>> > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen > >>> Sitz der Zweigniederlassung: Muenchen > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB > >>> 242240, USt-ID DE317013094 > >>> > >>> > >>> > >>> >