Hi Guowei, hi all,

The main drawback of the AsyncIO approach is the decreased flexibility. In
particular, as you mentioned for the advanced backpressure use cases, you
would need to chain several AsyncIOs:

>>>But whether a sink is overloaded not only depends on the queue size. It
> also depends on the number of in-flight async requests
> 1. How about chaining two AsyncIOs? One is for controlling the size of the
> buffer elements; The other is for controlling the in-flight async requests.
>

If we need an AsyncIO for each dimension of backpressure, we also might end
up with an incompatible state when a dimension is added or removed through
a configuration change.

With that being said, I'd like to start a vote on the proposal as your
strong objection disappeared. We can continue the discussion here but I'd
also appreciate any vote on [1].

[1]
https://lists.apache.org/thread.html/r7194846ec671e9e0e64908a7ae4cf32c2bccf1dd6ee7db107a52cf04%40%3Cdev.flink.apache.org%3E

On Fri, Jul 30, 2021 at 5:51 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Arvid & Piotr
> Sorry for the late reply.
> 1. Thank you all very much for your patience and explanation. Recently, I
> have also studied the related code of 'MailBox', which may not be as
> serious as I thought, after all, it is very similar to Java's `Executor`;
> 2. Whether to use AsyncIO or MailBox to implement Kinesis connector is more
> up to the contributor to decide (after all, `Mailbox` has decided to be
> exposed :-) ). It’s just that I personally prefer to combine some simple
> functions to complete a more advanced function.
> Best,
> Guowei
>
>
> On Sat, Jul 24, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote:
>
> > Just to reiterate on Piotr's point: MailboxExecutor is pretty much an
> > Executor [1] with named lambdas, except for the name MailboxExecutor
> > nothing is hinting at a specific threading model.
> >
> > Currently, we expose it on StreamOperator API. Afaik the idea is to make
> > the StreamOperator internal and beef up ProcessFunction but for several
> use
> > cases (e.g., AsyncIO), we actually need to expose the executor anyways.
> >
> > We could rename MailboxExecutor to avoid exposing the name of the
> threading
> > model. For example, we could rename it to TaskThreadExecutor (but that's
> > pretty much the same), to CooperativeExecutor (again implies Mailbox), to
> > o.a.f.Executor, to DeferredExecutor... Ideas are welcome.
> >
> > We could also simply use Java's Executor interface, however, when working
> > with that interface, I found that the missing context of async executed
> > lambdas made debugging much much harder. So that's why I designed
> > MailboxExecutor to force the user to give some debug string to each
> > invokation. In the sink context, we could, however, use an adaptor from
> > MailboxExecutor to Java's Executor and simply bind the sink name to the
> > invokations.
> >
> > [1]
> >
> >
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
> >
> > On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi,
> > >
> > > Regarding the question whether to expose the MailboxExecutor or not:
> > > 1. We have plans on exposing it in the ProcessFunction (in short we
> want
> > to
> > > make StreamOperator API private/internal only, and move all of it's
> extra
> > > functionality in one way or another to the ProcessFunction). I don't
> > > remember and I'm not sure if *Dawid* had a different idea about this
> (do
> > > not expose Mailbox but wrap it somehow?)
> > > 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure
> how
> > > helpful it will be for keeping backward compatibility in the future.
> > > `MailboxExecutor` is already a very generic interface that doesn't
> expose
> > > much about the current threading model. Note that the previous
> threading
> > > model (multi threaded with checkpoint lock), should be easy to
> implement
> > > using the `MailboxExecutor` interface (use a direct executor that
> > acquires
> > > checkpoint lock).
> > >
> > > Having said that, I haven't spent too much time thinking about whether
> > it's
> > > better to enrich AsyncIO or provide the AsyncSink. If we can just as
> > > efficiently provide the same functionality using the existing/enhanced
> > > AsyncIO API, that may be a good idea if it indeed reduces our
> > > maintenance costs.
> > >
> > > Piotrek
> > >
> > > pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> napisał(a):
> > >
> > > > Hi, Arvid
> > > >
> > > > >>>The main question here is what do you think is the harm of
> exposing
> > > > Mailbox? Is it the complexity or the maintenance overhead?
> > > >
> > > > I think that exposing the internal threading model might be risky. In
> > > case
> > > > the threading model changes, it will affect the user's api and bring
> > the
> > > > burden of internal modification. (Of course, you may have more say in
> > how
> > > > the MailBox model will develop in the future) Therefore, I think that
> > if
> > > an
> > > > alternative solution can be found, these possible risks will be
> > avoided:
> > > > for example, through AsyncIO.
> > > >
> > > > >>>>AsyncIO has no user state, so we would be quite limited in
> > > implementing
> > > > at-least-once sinks especially when it comes to retries. Chaining two
> > > > AsyncIO would make it even harder to reason about the built-in state.
> > We
> > > > would also give up any chance to implement exactly once async sinks
> > (even
> > > > though I'm not sure if it's possible at all).
> > > >
> > > > 1. Why would we need to use the state when retrying(maybe I miss
> > > > something)? If a batch of asynchronous requests fails, I think it is
> > > enough
> > > > to retry directly in the callback. Or extend AsyncIO to give it the
> > > ability
> > > > to retry(XXXFuture.fail (Excelption)); in addition, in general, the
> > > client
> > > > has its own retry mechanism, at least the producer of Kineses said in
> > the
> > > > document. Of course I am not opposed to retrying, I just want to
> find a
> > > > more obvious example to support the need to do so.
> > > >
> > > > 2. I don't think using AsyncIO will prevent exactly once in the
> future.
> > > > Both solutions need to be rewritten unless Exactly Once is required
> > from
> > > > the beginning.
> > > >
> > > > >>>Users will have a hard time to discover SinkUtil.sinkTo compared
> to
> > > the
> > > > expected stream.sinkTo. We have seen that on other occasions already
> > > (e.g.,
> > > > reinterpretAsKeyedStream).
> > > > In fact, I think this is the most important issue. We lack the
> function
> > > of
> > > > supporting sub-topology at the API layer, which is very inconvenient.
> > For
> > > > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think?
> > > > ```java
> > > > AsyncSinkTopologyBuildrer {
> > > > void build(inputstream) {
> > > > input.flatmap().async()…
> > > > }
> > > > ```
> > > > In general, I want to know what principles will guide when to expose
> > more
> > > > implementations to users, and when to combine with existing UDFs.
> > > >
> > > > Best,
> > > > Guowei
> > > >
> > > >
> > > > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org>
> wrote:
> > > >
> > > > > Hi Guowei,
> > > > >
> > > > > I think the whole discussion here started as a means to avoid
> > exposing
> > > > > MailboxExecutor to the user. Your preferred way would be to improve
> > > > AsyncIO
> > > > > to support batching or implement AsyncSink as batching+AsyncIO.
> Here
> > > are
> > > > > some thoughts.
> > > > >
> > > > > 1) We should take a step back and note that we actually already
> > expose
> > > > > MailboxExecutor on the Operator API in particular to implement
> > AsyncIO.
> > > > > (Now, we could say that we actually want to hide Operator API in
> the
> > > > future
> > > > > and could make MailboxExecutor internal again)
> > > > >
> > > > > 2) Exposing MailboxExecutor in general is a means for users to
> access
> > > the
> > > > > task thread in a cooperative way. That would allow certain
> > > communication
> > > > > patterns beyond simply async requests. For example, the user could
> > > > > re-enqueue retries or even effectively block input processing to
> > induce
> > > > > backpressure by enqueuing a waiting mail. So the main question is
> if
> > we
> > > > > want to empower sink devs to access the task thread or not. Note
> that
> > > > > indirectly, sink devs have that option already through timers. So
> in
> > > > > theory, they could also enqueue a MIN_TIME timer and use that to
> > > > implement
> > > > > any kind of async processing.
> > > > >
> > > > > The main question here is what do you think is the harm of exposing
> > > > > Mailbox? Is it the complexity or the maintenance overhead?
> > > > >
> > > > > 3) Simulating AsyncSink through AsyncIO is possible but has some
> > > > > downsides.
> > > > > a) AsyncIO has no user state, so we would be quite limited in
> > > > implementing
> > > > > at-least-once sinks especially when it comes to retries. Chaining
> two
> > > > > AsyncIO would make it even harder to reason about the built-in
> state.
> > > We
> > > > > would also give up any chance to implement exactly once async sinks
> > > (even
> > > > > though I'm not sure if it's possible at all).
> > > > > b) Users will have a hard time to discover SinkUtil.sinkTo compared
> > to
> > > > the
> > > > > expected stream.sinkTo. We have seen that on other occasions
> already
> > > > (e.g.,
> > > > > reinterpretAsKeyedStream).
> > > > > c) AsyncIO is optimized to feed back the result into the main task
> > > > thread.
> > > > > That's completely unneeded for sinks.
> > > > > d) You probably know it better than me, but imho the batch behavior
> > of
> > > a
> > > > > proper sink would be much better than an AsyncIO simulation (I have
> > not
> > > > > tracked the latest discussion in FLIP-147).
> > > > >
> > > > > Note that if we absolutely don't want to expose MailboxExecutor, we
> > can
> > > > > also try to get it through some casts to internal interfaces. So
> the
> > > > > Sink.InitContext could have a Sink.InitContextInternal subinterface
> > > that
> > > > > includes the mailbox executor. In AsyncSink, we could cast to the
> > > > internal
> > > > > interface and are the only ones that can access the MailboxExecutor
> > > > > legimately (of course, connector devs could do the same cast but
> then
> > > use
> > > > > an internal class and need to expect breaking changes).
> > > > >
> > > > > For your question:
> > > > >
> > > > >> 2. By the way, I have a little question. Why not directly reduce
> the
> > > > >> queue size to control the in-flight query, for example, according
> to
> > > > your
> > > > >> example,
> > > > >> Is a queue size such as 150 enough? In fact, there are caches in
> > many
> > > > >> places in the entire topology, such as buffers on the network
> stack.
> > > > >>
> > > > > It's a bit different. Let's take kinesis as an example. The sink
> > > collects
> > > > > 500 elements and puts them in a single request (500 is max number
> of
> > > > > records per batch request). Now it sends the request out. At the
> same
> > > > time,
> > > > > the next 500 elements are collected. So the in-flight query size
> > refers
> > > > to
> > > > > the number of parallel requets (500 elements each).
> > > > > If we first batch 500*numberOfInFlightRequests, and then send out
> all
> > > > > numberOfInFlightRequests at the same time, we get worse latency.
> > > > >
> > > > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi, Steffen
> > > > >>
> > > > >> Thank you for your detailed explanation.
> > > > >>
> > > > >> >>>But whether a sink is overloaded not only depends on the queue
> > > size.
> > > > >> It also depends on the number of in-flight async requests
> > > > >>
> > > > >> 1. How about chaining two AsyncIOs? One is for controlling the
> size
> > of
> > > > >> the buffer elements; The other is for controlling the in-flight
> > async
> > > > >> requests. I think this way might do it and would not need to
> expose
> > > the
> > > > >> MailboxExecutor.
> > > > >> 2. By the way, I have a little question. Why not directly reduce
> the
> > > > >> queue size to control the in-flight query, for example, according
> to
> > > > your
> > > > >> example,
> > > > >> Is a queue size such as 150 enough? In fact, there are caches in
> > many
> > > > >> places in the entire topology, such as buffers on the network
> stack.
> > > > >>
> > > > >> >>> I’m not sure whether I’m understanding who you are referring
> to
> > by
> > > > >> user.
> > > > >> Personally I mean the sink developer.
> > > > >>
> > > > >>
> > > > >> Best,
> > > > >> Guowei
> > > > >>
> > > > >>
> > > > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen
> > > > >> <shau...@amazon.de.invalid> wrote:
> > > > >>
> > > > >>> Hey,
> > > > >>>
> > > > >>> We are using the `MailboxExecutor` to block calls to `write` in
> > case
> > > > the
> > > > >>> sink is somehow overloaded. Overloaded basically means that the
> > sink
> > > > cannot
> > > > >>> persist messages quickly enough into the respective destination.
> > > > >>>
> > > > >>> But whether a sink is overloaded not only depends on the queue
> > size.
> > > It
> > > > >>> also depends on the number of in-flight async requests that must
> > not
> > > > grow
> > > > >>> too large either [1, 2]. We also need to support use cases where
> > the
> > > > >>> destination can only ingest x messages per second or a total
> > > > throughput of
> > > > >>> y per second. We are also planning to support time outs so that
> > data
> > > is
> > > > >>> persisted into the destination at least every n seconds by means
> of
> > > the
> > > > >>> `ProcessingTimeService`. The batching configuration will be part
> of
> > > the
> > > > >>> constructor, which has only been indicated in the current
> prototype
> > > > but is
> > > > >>> not implemented, yet [3].
> > > > >>>
> > > > >>> I’m not sure whether I’m understanding who you are referring to
> by
> > > > user.
> > > > >>> People who are using a concrete sink, eg, to send messages into a
> > > > Kinesis
> > > > >>> stream, will not be exposed to the `MailboxExecutor`. They are
> just
> > > > using
> > > > >>> the sink and pass in the batching configuration from above [4].
> The
> > > > >>> `MailboxExecutor` and `ProcessingTimeService` are only relevant
> for
> > > > sink
> > > > >>> authors who want to create support for a new destination. I would
> > > > expect
> > > > >>> that there are only few experts who are adding support for new
> > > > >>> destinations, who are capable to understand and use the advanced
> > > > constructs
> > > > >>> properly.
> > > > >>>
> > > > >>> Hope that helps to clarify our thinking.
> > > > >>>
> > > > >>> Cheers, Steffen
> > > > >>>
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > >
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
> > > > >>> [2]
> > > > >>>
> > > >
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
> > > > >>> [3]
> > > > >>>
> > > >
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
> > > > >>> [4]
> > > > >>>
> > > >
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> From: Arvid Heise <ar...@apache.org>
> > > > >>> Date: Tuesday, 20. July 2021 at 23:03
> > > > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann <
> > shau...@amazon.de>
> > > > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
> > > > >>>
> > > > >>>
> > > > >>> CAUTION: This email originated from outside of the organization.
> Do
> > > not
> > > > >>> click links or open attachments unless you can confirm the sender
> > and
> > > > know
> > > > >>> the content is safe.
> > > > >>>
> > > > >>>
> > > > >>> Hi Guowei,
> > > > >>>
> > > > >>> 1. your idea is quite similar to FLIP-171 [1]. The question is if
> > we
> > > > >>> implement FLIP-171 based on public interfaces (which would
> require
> > > > exposing
> > > > >>> MailboxExecutor as described here in FLIP-177) or if it's better
> to
> > > > >>> implement it internally and hide it.
> > > > >>> The first option is an abstract base class; your second option
> > would
> > > be
> > > > >>> an abstract interface that has matching implementation internally
> > > > >>> (similarly to AsyncIO).
> > > > >>> There is an example for option 1 in [2]; I think the idea was to
> > > > >>> additionally specify the batch size and batch timeout in the
> ctor.
> > > > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
> > > > >>>
> > > > >>> 2. I guess your question is if current AsyncIO is not sufficient
> > > > already
> > > > >>> if exactly-once is not needed? The answer is currently no,
> because
> > > > AsyncIO
> > > > >>> is not doing any batching. The user could do batching before that
> > but
> > > > >>> that's quite a bit of code. However, we should really think if
> > > AsyncIO
> > > > >>> should also support batching.
> > > > >>> I would also say that the scope of AsyncIO and AsyncSink is quite
> > > > >>> different: the first one is for application developers and the
> > second
> > > > one
> > > > >>> is for connector developers and would be deemed an implementation
> > > > detail by
> > > > >>> the application developer. Of course, advanced users may fall in
> > both
> > > > >>> categories, so the distinction does not always hold.
> > > > >>>
> > > > >>> Nevertheless, there is some overlap between both approaches and
> > it's
> > > > >>> important to think if the added complexity warrants the benefit.
> It
> > > > would
> > > > >>> be interesting to hear how other devs see that.
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > > > >>> [2]
> > > > >>>
> > > >
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
> > > > >>>
> > > > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com
> > > > <mailto:
> > > > >>> guowei....@gmail.com>> wrote:
> > > > >>> Hi, Avrid
> > > > >>> Thank you Avrid for perfecting Sink through this FLIP. I have two
> > > > little
> > > > >>> questions
> > > > >>>
> > > > >>> 1. What do you think of us directly providing an interface as
> > > follows?
> > > > In
> > > > >>> this way, there may be no need to expose the Mailbox to the user.
> > We
> > > > can
> > > > >>> implement an `AsyncSinkWriterOperator` to control the length of
> the
> > > > >>> queue.
> > > > >>> If it is too long, do not call SinkWriter::write.
> > > > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
> > > > >>>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
> > > > >>> WriterStateT> { //// please ignore the name of Tuple2 and
> XXXFuture
> > > at
> > > > >>> first.
> > > > >>>     int getQueueLimit();
> > > > >>> }
> > > > >>>
> > > > >>> 2. Another question is: If users don't care about exactly once
> and
> > > the
> > > > >>> unification of stream and batch, how about letting users use
> > > > >>> `AsyncFunction` directly? I don’t have an answer either. I want
> to
> > > hear
> > > > >>> your suggestions.
> > > > >>>
> > > > >>> Best,
> > > > >>> Guowei
> > > > >>>
> > > > >>>
> > > > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org
> > > <mailto:
> > > > >>> ar...@apache.org>> wrote:
> > > > >>>
> > > > >>> > Dear devs,
> > > > >>> >
> > > > >>> > today I'd like to start the discussion on the Sink API. I have
> > > > drafted
> > > > >>> a
> > > > >>> > FLIP [1] with an accompanying PR [2].
> > > > >>> >
> > > > >>> > This FLIP is a bit special as it's actually a few smaller
> > > Amend-FLIPs
> > > > >>> in
> > > > >>> > one. In this discussion, we should decide on the scope and cut
> > out
> > > > too
> > > > >>> > invasive steps if we can't reach an agreement.
> > > > >>> >
> > > > >>> > Step 1 is to add a few more pieces of information to context
> > > objects.
> > > > >>> > That's non-breaking and needed for the async communication
> > pattern
> > > in
> > > > >>> > FLIP-171 [3]. While we need to add a new Public API
> > > > (MailboxExecutor),
> > > > >>> I
> > > > >>> > think that this should entail the least discussions.
> > > > >>> >
> > > > >>> > Step 2 is to also offer the same context information to
> > committers.
> > > > >>> Here we
> > > > >>> > can offer some compatibility methods to not break existing
> sinks.
> > > The
> > > > >>> main
> > > > >>> > use case would be some async exactly-once sink but I'm not sure
> > if
> > > we
> > > > >>> would
> > > > >>> > use async communication patterns at all here (or simply wait
> for
> > > all
> > > > >>> async
> > > > >>> > requests to finish in a sync way). It may also help with async
> > > > cleanup
> > > > >>> > tasks though.
> > > > >>> >
> > > > >>> > While drafting Step 2, I noticed the big entanglement of the
> > > current
> > > > >>> API.
> > > > >>> > To figure out if there is a committer during the stream graph
> > > > >>> creation, we
> > > > >>> > actually need to create a committer which can have unforeseen
> > > > >>> consequences.
> > > > >>> > Thus, I spiked if we can disentangle the interface and have
> > > separate
> > > > >>> > interfaces for the different use cases. The resulting step 3
> > would
> > > > be a
> > > > >>> > completely breaking change and thus is probably controversial.
> > > > >>> However, I'd
> > > > >>> > also see the disentanglement as a way to prepare to make Sinks
> > more
> > > > >>> > expressive (write and commit coordinator) without completely
> > > > >>> overloading
> > > > >>> > the main interface.
> > > > >>> >
> > > > >>> > [1]
> > > > >>> >
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> > > > >>> > [2] https://github.com/apache/flink/pull/16399
> > > > >>> > [3]
> > > > >>> >
> > > > >>>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > > > >>> >
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Amazon Web Services EMEA SARL
> > > > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg
> > > > >>> Sitz der Gesellschaft: L-1855 Luxemburg
> > > > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
> > > > >>>
> > > > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> > > > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen
> > > > >>> Sitz der Zweigniederlassung: Muenchen
> > > > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter
> HRB
> > > > >>> 242240, USt-ID DE317013094
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > >
> > >
> >
>

Reply via email to