Hey Guowei,

there is one additional aspect I want to highlight that is relevant for the 
types of destinations we had in mind when designing the AsyncSink.

I'll again use Kinesis as an example, but the same argument applies to other 
destinations. We are using the PutRecords API to persist up to 500 events with 
a single API call to reduce the overhead compared to using individual calls per 
event. But not all of the 500 events may be persisted successfully, eg, a 
single event fails to be persisted due to server side throttling. With the 
MailboxExecutor based implementation, we can just add this event back to the 
internal queue. The event will then be retied with the next batch of 500 events.

In my understanding, that's not possible with the AsyncIO based approach. 
During a retry, we can only retry the failed events of the original batch of 
events, which means we would need to send a single event with a separate 
PutRecords call. Depending how often that happens, this could add up.

Does that make sense?

Cheers, Steffen


On 30.07.21, 05:51, "Guowei Ma" <guowei....@gmail.com> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    Hi, Arvid & Piotr
    Sorry for the late reply.
    1. Thank you all very much for your patience and explanation. Recently, I
    have also studied the related code of 'MailBox', which may not be as
    serious as I thought, after all, it is very similar to Java's `Executor`;
    2. Whether to use AsyncIO or MailBox to implement Kinesis connector is more
    up to the contributor to decide (after all, `Mailbox` has decided to be
    exposed :-) ). It’s just that I personally prefer to combine some simple
    functions to complete a more advanced function.
    Best,
    Guowei


    On Sat, Jul 24, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote:

    > Just to reiterate on Piotr's point: MailboxExecutor is pretty much an
    > Executor [1] with named lambdas, except for the name MailboxExecutor
    > nothing is hinting at a specific threading model.
    >
    > Currently, we expose it on StreamOperator API. Afaik the idea is to make
    > the StreamOperator internal and beef up ProcessFunction but for several 
use
    > cases (e.g., AsyncIO), we actually need to expose the executor anyways.
    >
    > We could rename MailboxExecutor to avoid exposing the name of the 
threading
    > model. For example, we could rename it to TaskThreadExecutor (but that's
    > pretty much the same), to CooperativeExecutor (again implies Mailbox), to
    > o.a.f.Executor, to DeferredExecutor... Ideas are welcome.
    >
    > We could also simply use Java's Executor interface, however, when working
    > with that interface, I found that the missing context of async executed
    > lambdas made debugging much much harder. So that's why I designed
    > MailboxExecutor to force the user to give some debug string to each
    > invokation. In the sink context, we could, however, use an adaptor from
    > MailboxExecutor to Java's Executor and simply bind the sink name to the
    > invokations.
    >
    > [1]
    >
    > 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
    >
    > On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski <pnowoj...@apache.org>
    > wrote:
    >
    > > Hi,
    > >
    > > Regarding the question whether to expose the MailboxExecutor or not:
    > > 1. We have plans on exposing it in the ProcessFunction (in short we want
    > to
    > > make StreamOperator API private/internal only, and move all of it's 
extra
    > > functionality in one way or another to the ProcessFunction). I don't
    > > remember and I'm not sure if *Dawid* had a different idea about this (do
    > > not expose Mailbox but wrap it somehow?)
    > > 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure how
    > > helpful it will be for keeping backward compatibility in the future.
    > > `MailboxExecutor` is already a very generic interface that doesn't 
expose
    > > much about the current threading model. Note that the previous threading
    > > model (multi threaded with checkpoint lock), should be easy to implement
    > > using the `MailboxExecutor` interface (use a direct executor that
    > acquires
    > > checkpoint lock).
    > >
    > > Having said that, I haven't spent too much time thinking about whether
    > it's
    > > better to enrich AsyncIO or provide the AsyncSink. If we can just as
    > > efficiently provide the same functionality using the existing/enhanced
    > > AsyncIO API, that may be a good idea if it indeed reduces our
    > > maintenance costs.
    > >
    > > Piotrek
    > >
    > > pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> napisał(a):
    > >
    > > > Hi, Arvid
    > > >
    > > > >>>The main question here is what do you think is the harm of exposing
    > > > Mailbox? Is it the complexity or the maintenance overhead?
    > > >
    > > > I think that exposing the internal threading model might be risky. In
    > > case
    > > > the threading model changes, it will affect the user's api and bring
    > the
    > > > burden of internal modification. (Of course, you may have more say in
    > how
    > > > the MailBox model will develop in the future) Therefore, I think that
    > if
    > > an
    > > > alternative solution can be found, these possible risks will be
    > avoided:
    > > > for example, through AsyncIO.
    > > >
    > > > >>>>AsyncIO has no user state, so we would be quite limited in
    > > implementing
    > > > at-least-once sinks especially when it comes to retries. Chaining two
    > > > AsyncIO would make it even harder to reason about the built-in state.
    > We
    > > > would also give up any chance to implement exactly once async sinks
    > (even
    > > > though I'm not sure if it's possible at all).
    > > >
    > > > 1. Why would we need to use the state when retrying(maybe I miss
    > > > something)? If a batch of asynchronous requests fails, I think it is
    > > enough
    > > > to retry directly in the callback. Or extend AsyncIO to give it the
    > > ability
    > > > to retry(XXXFuture.fail (Excelption)); in addition, in general, the
    > > client
    > > > has its own retry mechanism, at least the producer of Kineses said in
    > the
    > > > document. Of course I am not opposed to retrying, I just want to find 
a
    > > > more obvious example to support the need to do so.
    > > >
    > > > 2. I don't think using AsyncIO will prevent exactly once in the 
future.
    > > > Both solutions need to be rewritten unless Exactly Once is required
    > from
    > > > the beginning.
    > > >
    > > > >>>Users will have a hard time to discover SinkUtil.sinkTo compared to
    > > the
    > > > expected stream.sinkTo. We have seen that on other occasions already
    > > (e.g.,
    > > > reinterpretAsKeyedStream).
    > > > In fact, I think this is the most important issue. We lack the 
function
    > > of
    > > > supporting sub-topology at the API layer, which is very inconvenient.
    > For
    > > > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think?
    > > > ```java
    > > > AsyncSinkTopologyBuildrer {
    > > > void build(inputstream) {
    > > > input.flatmap().async()…
    > > > }
    > > > ```
    > > > In general, I want to know what principles will guide when to expose
    > more
    > > > implementations to users, and when to combine with existing UDFs.
    > > >
    > > > Best,
    > > > Guowei
    > > >
    > > >
    > > > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org> wrote:
    > > >
    > > > > Hi Guowei,
    > > > >
    > > > > I think the whole discussion here started as a means to avoid
    > exposing
    > > > > MailboxExecutor to the user. Your preferred way would be to improve
    > > > AsyncIO
    > > > > to support batching or implement AsyncSink as batching+AsyncIO.  
Here
    > > are
    > > > > some thoughts.
    > > > >
    > > > > 1) We should take a step back and note that we actually already
    > expose
    > > > > MailboxExecutor on the Operator API in particular to implement
    > AsyncIO.
    > > > > (Now, we could say that we actually want to hide Operator API in the
    > > > future
    > > > > and could make MailboxExecutor internal again)
    > > > >
    > > > > 2) Exposing MailboxExecutor in general is a means for users to 
access
    > > the
    > > > > task thread in a cooperative way. That would allow certain
    > > communication
    > > > > patterns beyond simply async requests. For example, the user could
    > > > > re-enqueue retries or even effectively block input processing to
    > induce
    > > > > backpressure by enqueuing a waiting mail. So the main question is if
    > we
    > > > > want to empower sink devs to access the task thread or not. Note 
that
    > > > > indirectly, sink devs have that option already through timers. So in
    > > > > theory, they could also enqueue a MIN_TIME timer and use that to
    > > > implement
    > > > > any kind of async processing.
    > > > >
    > > > > The main question here is what do you think is the harm of exposing
    > > > > Mailbox? Is it the complexity or the maintenance overhead?
    > > > >
    > > > > 3) Simulating AsyncSink through AsyncIO is possible but has some
    > > > > downsides.
    > > > > a) AsyncIO has no user state, so we would be quite limited in
    > > > implementing
    > > > > at-least-once sinks especially when it comes to retries. Chaining 
two
    > > > > AsyncIO would make it even harder to reason about the built-in 
state.
    > > We
    > > > > would also give up any chance to implement exactly once async sinks
    > > (even
    > > > > though I'm not sure if it's possible at all).
    > > > > b) Users will have a hard time to discover SinkUtil.sinkTo compared
    > to
    > > > the
    > > > > expected stream.sinkTo. We have seen that on other occasions already
    > > > (e.g.,
    > > > > reinterpretAsKeyedStream).
    > > > > c) AsyncIO is optimized to feed back the result into the main task
    > > > thread.
    > > > > That's completely unneeded for sinks.
    > > > > d) You probably know it better than me, but imho the batch behavior
    > of
    > > a
    > > > > proper sink would be much better than an AsyncIO simulation (I have
    > not
    > > > > tracked the latest discussion in FLIP-147).
    > > > >
    > > > > Note that if we absolutely don't want to expose MailboxExecutor, we
    > can
    > > > > also try to get it through some casts to internal interfaces. So the
    > > > > Sink.InitContext could have a Sink.InitContextInternal subinterface
    > > that
    > > > > includes the mailbox executor. In AsyncSink, we could cast to the
    > > > internal
    > > > > interface and are the only ones that can access the MailboxExecutor
    > > > > legimately (of course, connector devs could do the same cast but 
then
    > > use
    > > > > an internal class and need to expect breaking changes).
    > > > >
    > > > > For your question:
    > > > >
    > > > >> 2. By the way, I have a little question. Why not directly reduce 
the
    > > > >> queue size to control the in-flight query, for example, according 
to
    > > > your
    > > > >> example,
    > > > >> Is a queue size such as 150 enough? In fact, there are caches in
    > many
    > > > >> places in the entire topology, such as buffers on the network 
stack.
    > > > >>
    > > > > It's a bit different. Let's take kinesis as an example. The sink
    > > collects
    > > > > 500 elements and puts them in a single request (500 is max number of
    > > > > records per batch request). Now it sends the request out. At the 
same
    > > > time,
    > > > > the next 500 elements are collected. So the in-flight query size
    > refers
    > > > to
    > > > > the number of parallel requets (500 elements each).
    > > > > If we first batch 500*numberOfInFlightRequests, and then send out 
all
    > > > > numberOfInFlightRequests at the same time, we get worse latency.
    > > > >
    > > > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com>
    > > wrote:
    > > > >
    > > > >> Hi, Steffen
    > > > >>
    > > > >> Thank you for your detailed explanation.
    > > > >>
    > > > >> >>>But whether a sink is overloaded not only depends on the queue
    > > size.
    > > > >> It also depends on the number of in-flight async requests
    > > > >>
    > > > >> 1. How about chaining two AsyncIOs? One is for controlling the size
    > of
    > > > >> the buffer elements; The other is for controlling the in-flight
    > async
    > > > >> requests. I think this way might do it and would not need to expose
    > > the
    > > > >> MailboxExecutor.
    > > > >> 2. By the way, I have a little question. Why not directly reduce 
the
    > > > >> queue size to control the in-flight query, for example, according 
to
    > > > your
    > > > >> example,
    > > > >> Is a queue size such as 150 enough? In fact, there are caches in
    > many
    > > > >> places in the entire topology, such as buffers on the network 
stack.
    > > > >>
    > > > >> >>> I’m not sure whether I’m understanding who you are referring to
    > by
    > > > >> user.
    > > > >> Personally I mean the sink developer.
    > > > >>
    > > > >>
    > > > >> Best,
    > > > >> Guowei
    > > > >>
    > > > >>
    > > > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen
    > > > >> <shau...@amazon.de.invalid> wrote:
    > > > >>
    > > > >>> Hey,
    > > > >>>
    > > > >>> We are using the `MailboxExecutor` to block calls to `write` in
    > case
    > > > the
    > > > >>> sink is somehow overloaded. Overloaded basically means that the
    > sink
    > > > cannot
    > > > >>> persist messages quickly enough into the respective destination.
    > > > >>>
    > > > >>> But whether a sink is overloaded not only depends on the queue
    > size.
    > > It
    > > > >>> also depends on the number of in-flight async requests that must
    > not
    > > > grow
    > > > >>> too large either [1, 2]. We also need to support use cases where
    > the
    > > > >>> destination can only ingest x messages per second or a total
    > > > throughput of
    > > > >>> y per second. We are also planning to support time outs so that
    > data
    > > is
    > > > >>> persisted into the destination at least every n seconds by means 
of
    > > the
    > > > >>> `ProcessingTimeService`. The batching configuration will be part 
of
    > > the
    > > > >>> constructor, which has only been indicated in the current 
prototype
    > > > but is
    > > > >>> not implemented, yet [3].
    > > > >>>
    > > > >>> I’m not sure whether I’m understanding who you are referring to by
    > > > user.
    > > > >>> People who are using a concrete sink, eg, to send messages into a
    > > > Kinesis
    > > > >>> stream, will not be exposed to the `MailboxExecutor`. They are 
just
    > > > using
    > > > >>> the sink and pass in the batching configuration from above [4]. 
The
    > > > >>> `MailboxExecutor` and `ProcessingTimeService` are only relevant 
for
    > > > sink
    > > > >>> authors who want to create support for a new destination. I would
    > > > expect
    > > > >>> that there are only few experts who are adding support for new
    > > > >>> destinations, who are capable to understand and use the advanced
    > > > constructs
    > > > >>> properly.
    > > > >>>
    > > > >>> Hope that helps to clarify our thinking.
    > > > >>>
    > > > >>> Cheers, Steffen
    > > > >>>
    > > > >>>
    > > > >>> [1]
    > > > >>>
    > > >
    > >
    > 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
    > > > >>> [2]
    > > > >>>
    > > >
    > >
    > 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
    > > > >>> [3]
    > > > >>>
    > > >
    > >
    > 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
    > > > >>> [4]
    > > > >>>
    > > >
    > >
    > 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
    > > > >>>
    > > > >>>
    > > > >>>
    > > > >>> From: Arvid Heise <ar...@apache.org>
    > > > >>> Date: Tuesday, 20. July 2021 at 23:03
    > > > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann <
    > shau...@amazon.de>
    > > > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
    > > > >>>
    > > > >>>
    > > > >>> CAUTION: This email originated from outside of the organization. 
Do
    > > not
    > > > >>> click links or open attachments unless you can confirm the sender
    > and
    > > > know
    > > > >>> the content is safe.
    > > > >>>
    > > > >>>
    > > > >>> Hi Guowei,
    > > > >>>
    > > > >>> 1. your idea is quite similar to FLIP-171 [1]. The question is if
    > we
    > > > >>> implement FLIP-171 based on public interfaces (which would require
    > > > exposing
    > > > >>> MailboxExecutor as described here in FLIP-177) or if it's better 
to
    > > > >>> implement it internally and hide it.
    > > > >>> The first option is an abstract base class; your second option
    > would
    > > be
    > > > >>> an abstract interface that has matching implementation internally
    > > > >>> (similarly to AsyncIO).
    > > > >>> There is an example for option 1 in [2]; I think the idea was to
    > > > >>> additionally specify the batch size and batch timeout in the ctor.
    > > > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
    > > > >>>
    > > > >>> 2. I guess your question is if current AsyncIO is not sufficient
    > > > already
    > > > >>> if exactly-once is not needed? The answer is currently no, because
    > > > AsyncIO
    > > > >>> is not doing any batching. The user could do batching before that
    > but
    > > > >>> that's quite a bit of code. However, we should really think if
    > > AsyncIO
    > > > >>> should also support batching.
    > > > >>> I would also say that the scope of AsyncIO and AsyncSink is quite
    > > > >>> different: the first one is for application developers and the
    > second
    > > > one
    > > > >>> is for connector developers and would be deemed an implementation
    > > > detail by
    > > > >>> the application developer. Of course, advanced users may fall in
    > both
    > > > >>> categories, so the distinction does not always hold.
    > > > >>>
    > > > >>> Nevertheless, there is some overlap between both approaches and
    > it's
    > > > >>> important to think if the added complexity warrants the benefit. 
It
    > > > would
    > > > >>> be interesting to hear how other devs see that.
    > > > >>>
    > > > >>> [1]
    > > > >>>
    > > >
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    > > > >>> [2]
    > > > >>>
    > > >
    > >
    > 
https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
    > > > >>>
    > > > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com
    > > > <mailto:
    > > > >>> guowei....@gmail.com>> wrote:
    > > > >>> Hi, Avrid
    > > > >>> Thank you Avrid for perfecting Sink through this FLIP. I have two
    > > > little
    > > > >>> questions
    > > > >>>
    > > > >>> 1. What do you think of us directly providing an interface as
    > > follows?
    > > > In
    > > > >>> this way, there may be no need to expose the Mailbox to the user.
    > We
    > > > can
    > > > >>> implement an `AsyncSinkWriterOperator` to control the length of 
the
    > > > >>> queue.
    > > > >>> If it is too long, do not call SinkWriter::write.
    > > > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
    > > > >>>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
    > > > >>> WriterStateT> { //// please ignore the name of Tuple2 and 
XXXFuture
    > > at
    > > > >>> first.
    > > > >>>     int getQueueLimit();
    > > > >>> }
    > > > >>>
    > > > >>> 2. Another question is: If users don't care about exactly once and
    > > the
    > > > >>> unification of stream and batch, how about letting users use
    > > > >>> `AsyncFunction` directly? I don’t have an answer either. I want to
    > > hear
    > > > >>> your suggestions.
    > > > >>>
    > > > >>> Best,
    > > > >>> Guowei
    > > > >>>
    > > > >>>
    > > > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org
    > > <mailto:
    > > > >>> ar...@apache.org>> wrote:
    > > > >>>
    > > > >>> > Dear devs,
    > > > >>> >
    > > > >>> > today I'd like to start the discussion on the Sink API. I have
    > > > drafted
    > > > >>> a
    > > > >>> > FLIP [1] with an accompanying PR [2].
    > > > >>> >
    > > > >>> > This FLIP is a bit special as it's actually a few smaller
    > > Amend-FLIPs
    > > > >>> in
    > > > >>> > one. In this discussion, we should decide on the scope and cut
    > out
    > > > too
    > > > >>> > invasive steps if we can't reach an agreement.
    > > > >>> >
    > > > >>> > Step 1 is to add a few more pieces of information to context
    > > objects.
    > > > >>> > That's non-breaking and needed for the async communication
    > pattern
    > > in
    > > > >>> > FLIP-171 [3]. While we need to add a new Public API
    > > > (MailboxExecutor),
    > > > >>> I
    > > > >>> > think that this should entail the least discussions.
    > > > >>> >
    > > > >>> > Step 2 is to also offer the same context information to
    > committers.
    > > > >>> Here we
    > > > >>> > can offer some compatibility methods to not break existing 
sinks.
    > > The
    > > > >>> main
    > > > >>> > use case would be some async exactly-once sink but I'm not sure
    > if
    > > we
    > > > >>> would
    > > > >>> > use async communication patterns at all here (or simply wait for
    > > all
    > > > >>> async
    > > > >>> > requests to finish in a sync way). It may also help with async
    > > > cleanup
    > > > >>> > tasks though.
    > > > >>> >
    > > > >>> > While drafting Step 2, I noticed the big entanglement of the
    > > current
    > > > >>> API.
    > > > >>> > To figure out if there is a committer during the stream graph
    > > > >>> creation, we
    > > > >>> > actually need to create a committer which can have unforeseen
    > > > >>> consequences.
    > > > >>> > Thus, I spiked if we can disentangle the interface and have
    > > separate
    > > > >>> > interfaces for the different use cases. The resulting step 3
    > would
    > > > be a
    > > > >>> > completely breaking change and thus is probably controversial.
    > > > >>> However, I'd
    > > > >>> > also see the disentanglement as a way to prepare to make Sinks
    > more
    > > > >>> > expressive (write and commit coordinator) without completely
    > > > >>> overloading
    > > > >>> > the main interface.
    > > > >>> >
    > > > >>> > [1]
    > > > >>> >
    > > > >>> >
    > > > >>>
    > > >
    > >
    > 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
    > > > >>> > [2] https://github.com/apache/flink/pull/16399
    > > > >>> > [3]
    > > > >>> >
    > > > >>>
    > > >
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    > > > >>> >
    > > > >>>
    > > > >>>
    > > > >>>
    > > > >>> Amazon Web Services EMEA SARL
    > > > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg
    > > > >>> Sitz der Gesellschaft: L-1855 Luxemburg
    > > > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. 
B186284
    > > > >>>
    > > > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
    > > > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen
    > > > >>> Sitz der Zweigniederlassung: Muenchen
    > > > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
    > > > >>> 242240, USt-ID DE317013094
    > > > >>>
    > > > >>>
    > > > >>>
    > > > >>>
    > > >
    > >
    >




Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, 
USt-ID DE317013094



Reply via email to