Hi, Steffen

Thank you for your detailed explanation.

>>>But whether a sink is overloaded not only depends on the queue size. It
also depends on the number of in-flight async requests

1. How about chaining two AsyncIOs? One is for controlling the size of the
buffer elements; The other is for controlling the in-flight async requests.
I think this way might do it and would not need to expose the
MailboxExecutor.
2. By the way, I have a little question. Why not directly reduce the queue
size to control the in-flight query, for example, according to your
example,
Is a queue size such as 150 enough? In fact, there are caches in many
places in the entire topology, such as buffers on the network stack.

>>> I’m not sure whether I’m understanding who you are referring to by
user.
Personally I mean the sink developer.


Best,
Guowei


On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen <shau...@amazon.de.invalid>
wrote:

> Hey,
>
> We are using the `MailboxExecutor` to block calls to `write` in case the
> sink is somehow overloaded. Overloaded basically means that the sink cannot
> persist messages quickly enough into the respective destination.
>
> But whether a sink is overloaded not only depends on the queue size. It
> also depends on the number of in-flight async requests that must not grow
> too large either [1, 2]. We also need to support use cases where the
> destination can only ingest x messages per second or a total throughput of
> y per second. We are also planning to support time outs so that data is
> persisted into the destination at least every n seconds by means of the
> `ProcessingTimeService`. The batching configuration will be part of the
> constructor, which has only been indicated in the current prototype but is
> not implemented, yet [3].
>
> I’m not sure whether I’m understanding who you are referring to by user.
> People who are using a concrete sink, eg, to send messages into a Kinesis
> stream, will not be exposed to the `MailboxExecutor`. They are just using
> the sink and pass in the batching configuration from above [4]. The
> `MailboxExecutor` and `ProcessingTimeService` are only relevant for sink
> authors who want to create support for a new destination. I would expect
> that there are only few experts who are adding support for new
> destinations, who are capable to understand and use the advanced constructs
> properly.
>
> Hope that helps to clarify our thinking.
>
> Cheers, Steffen
>
>
> [1]
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
> [2]
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
> [3]
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
> [4]
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
>
>
>
> From: Arvid Heise <ar...@apache.org>
> Date: Tuesday, 20. July 2021 at 23:03
> To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de>
> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
>
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
> Hi Guowei,
>
> 1. your idea is quite similar to FLIP-171 [1]. The question is if we
> implement FLIP-171 based on public interfaces (which would require exposing
> MailboxExecutor as described here in FLIP-177) or if it's better to
> implement it internally and hide it.
> The first option is an abstract base class; your second option would be an
> abstract interface that has matching implementation internally (similarly
> to AsyncIO).
> There is an example for option 1 in [2]; I think the idea was to
> additionally specify the batch size and batch timeout in the ctor.
> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
>
> 2. I guess your question is if current AsyncIO is not sufficient already
> if exactly-once is not needed? The answer is currently no, because AsyncIO
> is not doing any batching. The user could do batching before that but
> that's quite a bit of code. However, we should really think if AsyncIO
> should also support batching.
> I would also say that the scope of AsyncIO and AsyncSink is quite
> different: the first one is for application developers and the second one
> is for connector developers and would be deemed an implementation detail by
> the application developer. Of course, advanced users may fall in both
> categories, so the distinction does not always hold.
>
> Nevertheless, there is some overlap between both approaches and it's
> important to think if the added complexity warrants the benefit. It would
> be interesting to hear how other devs see that.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> [2]
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>
> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com<mailto:
> guowei....@gmail.com>> wrote:
> Hi, Avrid
> Thank you Avrid for perfecting Sink through this FLIP. I have two little
> questions
>
> 1. What do you think of us directly providing an interface as follows? In
> this way, there may be no need to expose the Mailbox to the user. We can
> implement an `AsyncSinkWriterOperator` to control the length of the queue.
> If it is too long, do not call SinkWriter::write.
> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
> WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at
> first.
>     int getQueueLimit();
> }
>
> 2. Another question is: If users don't care about exactly once and the
> unification of stream and batch, how about letting users use
> `AsyncFunction` directly? I don’t have an answer either. I want to hear
> your suggestions.
>
> Best,
> Guowei
>
>
> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org<mailto:
> ar...@apache.org>> wrote:
>
> > Dear devs,
> >
> > today I'd like to start the discussion on the Sink API. I have drafted a
> > FLIP [1] with an accompanying PR [2].
> >
> > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in
> > one. In this discussion, we should decide on the scope and cut out too
> > invasive steps if we can't reach an agreement.
> >
> > Step 1 is to add a few more pieces of information to context objects.
> > That's non-breaking and needed for the async communication pattern in
> > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I
> > think that this should entail the least discussions.
> >
> > Step 2 is to also offer the same context information to committers. Here
> we
> > can offer some compatibility methods to not break existing sinks. The
> main
> > use case would be some async exactly-once sink but I'm not sure if we
> would
> > use async communication patterns at all here (or simply wait for all
> async
> > requests to finish in a sync way). It may also help with async cleanup
> > tasks though.
> >
> > While drafting Step 2, I noticed the big entanglement of the current API.
> > To figure out if there is a committer during the stream graph creation,
> we
> > actually need to create a committer which can have unforeseen
> consequences.
> > Thus, I spiked if we can disentangle the interface and have separate
> > interfaces for the different use cases. The resulting step 3 would be a
> > completely breaking change and thus is probably controversial. However,
> I'd
> > also see the disentanglement as a way to prepare to make Sinks more
> > expressive (write and commit coordinator) without completely overloading
> > the main interface.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> > [2] https://github.com/apache/flink/pull/16399
> > [3]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> >
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>

Reply via email to