Hi, Steffen Thank you for your detailed explanation.
>>>But whether a sink is overloaded not only depends on the queue size. It also depends on the number of in-flight async requests 1. How about chaining two AsyncIOs? One is for controlling the size of the buffer elements; The other is for controlling the in-flight async requests. I think this way might do it and would not need to expose the MailboxExecutor. 2. By the way, I have a little question. Why not directly reduce the queue size to control the in-flight query, for example, according to your example, Is a queue size such as 150 enough? In fact, there are caches in many places in the entire topology, such as buffers on the network stack. >>> I’m not sure whether I’m understanding who you are referring to by user. Personally I mean the sink developer. Best, Guowei On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen <shau...@amazon.de.invalid> wrote: > Hey, > > We are using the `MailboxExecutor` to block calls to `write` in case the > sink is somehow overloaded. Overloaded basically means that the sink cannot > persist messages quickly enough into the respective destination. > > But whether a sink is overloaded not only depends on the queue size. It > also depends on the number of in-flight async requests that must not grow > too large either [1, 2]. We also need to support use cases where the > destination can only ingest x messages per second or a total throughput of > y per second. We are also planning to support time outs so that data is > persisted into the destination at least every n seconds by means of the > `ProcessingTimeService`. The batching configuration will be part of the > constructor, which has only been indicated in the current prototype but is > not implemented, yet [3]. > > I’m not sure whether I’m understanding who you are referring to by user. > People who are using a concrete sink, eg, to send messages into a Kinesis > stream, will not be exposed to the `MailboxExecutor`. They are just using > the sink and pass in the batching configuration from above [4]. The > `MailboxExecutor` and `ProcessingTimeService` are only relevant for sink > authors who want to create support for a new destination. I would expect > that there are only few experts who are adding support for new > destinations, who are capable to understand and use the advanced constructs > properly. > > Hope that helps to clarify our thinking. > > Cheers, Steffen > > > [1] > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118 > [2] > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155 > [3] > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49 > [4] > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45 > > > > From: Arvid Heise <ar...@apache.org> > Date: Tuesday, 20. July 2021 at 23:03 > To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de> > Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API > > > CAUTION: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > Hi Guowei, > > 1. your idea is quite similar to FLIP-171 [1]. The question is if we > implement FLIP-171 based on public interfaces (which would require exposing > MailboxExecutor as described here in FLIP-177) or if it's better to > implement it internally and hide it. > The first option is an abstract base class; your second option would be an > abstract interface that has matching implementation internally (similarly > to AsyncIO). > There is an example for option 1 in [2]; I think the idea was to > additionally specify the batch size and batch timeout in the ctor. > @Hausmann, Steffen<mailto:shau...@amazon.de> knows more. > > 2. I guess your question is if current AsyncIO is not sufficient already > if exactly-once is not needed? The answer is currently no, because AsyncIO > is not doing any batching. The user could do batching before that but > that's quite a bit of code. However, we should really think if AsyncIO > should also support batching. > I would also say that the scope of AsyncIO and AsyncSink is quite > different: the first one is for application developers and the second one > is for connector developers and would be deemed an implementation detail by > the application developer. Of course, advanced users may fall in both > categories, so the distinction does not always hold. > > Nevertheless, there is some overlap between both approaches and it's > important to think if the added complexity warrants the benefit. It would > be interesting to hear how other devs see that. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > [2] > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java > > On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com<mailto: > guowei....@gmail.com>> wrote: > Hi, Avrid > Thank you Avrid for perfecting Sink through this FLIP. I have two little > questions > > 1. What do you think of us directly providing an interface as follows? In > this way, there may be no need to expose the Mailbox to the user. We can > implement an `AsyncSinkWriterOperator` to control the length of the queue. > If it is too long, do not call SinkWriter::write. > public interface AsyncSinkWriter<InputT, CommT, WriterStateT> > extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT, > WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at > first. > int getQueueLimit(); > } > > 2. Another question is: If users don't care about exactly once and the > unification of stream and batch, how about letting users use > `AsyncFunction` directly? I don’t have an answer either. I want to hear > your suggestions. > > Best, > Guowei > > > On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org<mailto: > ar...@apache.org>> wrote: > > > Dear devs, > > > > today I'd like to start the discussion on the Sink API. I have drafted a > > FLIP [1] with an accompanying PR [2]. > > > > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in > > one. In this discussion, we should decide on the scope and cut out too > > invasive steps if we can't reach an agreement. > > > > Step 1 is to add a few more pieces of information to context objects. > > That's non-breaking and needed for the async communication pattern in > > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I > > think that this should entail the least discussions. > > > > Step 2 is to also offer the same context information to committers. Here > we > > can offer some compatibility methods to not break existing sinks. The > main > > use case would be some async exactly-once sink but I'm not sure if we > would > > use async communication patterns at all here (or simply wait for all > async > > requests to finish in a sync way). It may also help with async cleanup > > tasks though. > > > > While drafting Step 2, I noticed the big entanglement of the current API. > > To figure out if there is a committer during the stream graph creation, > we > > actually need to create a committer which can have unforeseen > consequences. > > Thus, I spiked if we can disentangle the interface and have separate > > interfaces for the different use cases. The resulting step 3 would be a > > completely breaking change and thus is probably controversial. However, > I'd > > also see the disentanglement as a way to prepare to make Sinks more > > expressive (write and commit coordinator) without completely overloading > > the main interface. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > > [2] https://github.com/apache/flink/pull/16399 > > [3] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > > > > Amazon Web Services EMEA SARL > 38 avenue John F. Kennedy, L-1855 Luxembourg > Sitz der Gesellschaft: L-1855 Luxemburg > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > Marcel-Breuer-Str. 12, D-80807 Muenchen > Sitz der Zweigniederlassung: Muenchen > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, > USt-ID DE317013094 > > > >