Hi, Arvid & Piotr
Sorry for the late reply.
1. Thank you all very much for your patience and explanation. Recently, I
have also studied the related code of 'MailBox', which may not be as
serious as I thought, after all, it is very similar to Java's `Executor`;
2. Whether to use AsyncIO or MailBox to implement Kinesis connector is more
up to the contributor to decide (after all, `Mailbox` has decided to be
exposed :-) ). It’s just that I personally prefer to combine some simple
functions to complete a more advanced function.
Best,
Guowei


On Sat, Jul 24, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote:

> Just to reiterate on Piotr's point: MailboxExecutor is pretty much an
> Executor [1] with named lambdas, except for the name MailboxExecutor
> nothing is hinting at a specific threading model.
>
> Currently, we expose it on StreamOperator API. Afaik the idea is to make
> the StreamOperator internal and beef up ProcessFunction but for several use
> cases (e.g., AsyncIO), we actually need to expose the executor anyways.
>
> We could rename MailboxExecutor to avoid exposing the name of the threading
> model. For example, we could rename it to TaskThreadExecutor (but that's
> pretty much the same), to CooperativeExecutor (again implies Mailbox), to
> o.a.f.Executor, to DeferredExecutor... Ideas are welcome.
>
> We could also simply use Java's Executor interface, however, when working
> with that interface, I found that the missing context of async executed
> lambdas made debugging much much harder. So that's why I designed
> MailboxExecutor to force the user to give some debug string to each
> invokation. In the sink context, we could, however, use an adaptor from
> MailboxExecutor to Java's Executor and simply bind the sink name to the
> invokations.
>
> [1]
>
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
>
> On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi,
> >
> > Regarding the question whether to expose the MailboxExecutor or not:
> > 1. We have plans on exposing it in the ProcessFunction (in short we want
> to
> > make StreamOperator API private/internal only, and move all of it's extra
> > functionality in one way or another to the ProcessFunction). I don't
> > remember and I'm not sure if *Dawid* had a different idea about this (do
> > not expose Mailbox but wrap it somehow?)
> > 2. If we provide a thin wrapper around MailboxExecutor, I'm not sure how
> > helpful it will be for keeping backward compatibility in the future.
> > `MailboxExecutor` is already a very generic interface that doesn't expose
> > much about the current threading model. Note that the previous threading
> > model (multi threaded with checkpoint lock), should be easy to implement
> > using the `MailboxExecutor` interface (use a direct executor that
> acquires
> > checkpoint lock).
> >
> > Having said that, I haven't spent too much time thinking about whether
> it's
> > better to enrich AsyncIO or provide the AsyncSink. If we can just as
> > efficiently provide the same functionality using the existing/enhanced
> > AsyncIO API, that may be a good idea if it indeed reduces our
> > maintenance costs.
> >
> > Piotrek
> >
> > pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> napisał(a):
> >
> > > Hi, Arvid
> > >
> > > >>>The main question here is what do you think is the harm of exposing
> > > Mailbox? Is it the complexity or the maintenance overhead?
> > >
> > > I think that exposing the internal threading model might be risky. In
> > case
> > > the threading model changes, it will affect the user's api and bring
> the
> > > burden of internal modification. (Of course, you may have more say in
> how
> > > the MailBox model will develop in the future) Therefore, I think that
> if
> > an
> > > alternative solution can be found, these possible risks will be
> avoided:
> > > for example, through AsyncIO.
> > >
> > > >>>>AsyncIO has no user state, so we would be quite limited in
> > implementing
> > > at-least-once sinks especially when it comes to retries. Chaining two
> > > AsyncIO would make it even harder to reason about the built-in state.
> We
> > > would also give up any chance to implement exactly once async sinks
> (even
> > > though I'm not sure if it's possible at all).
> > >
> > > 1. Why would we need to use the state when retrying(maybe I miss
> > > something)? If a batch of asynchronous requests fails, I think it is
> > enough
> > > to retry directly in the callback. Or extend AsyncIO to give it the
> > ability
> > > to retry(XXXFuture.fail (Excelption)); in addition, in general, the
> > client
> > > has its own retry mechanism, at least the producer of Kineses said in
> the
> > > document. Of course I am not opposed to retrying, I just want to find a
> > > more obvious example to support the need to do so.
> > >
> > > 2. I don't think using AsyncIO will prevent exactly once in the future.
> > > Both solutions need to be rewritten unless Exactly Once is required
> from
> > > the beginning.
> > >
> > > >>>Users will have a hard time to discover SinkUtil.sinkTo compared to
> > the
> > > expected stream.sinkTo. We have seen that on other occasions already
> > (e.g.,
> > > reinterpretAsKeyedStream).
> > > In fact, I think this is the most important issue. We lack the function
> > of
> > > supporting sub-topology at the API layer, which is very inconvenient.
> For
> > > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think?
> > > ```java
> > > AsyncSinkTopologyBuildrer {
> > > void build(inputstream) {
> > > input.flatmap().async()…
> > > }
> > > ```
> > > In general, I want to know what principles will guide when to expose
> more
> > > implementations to users, and when to combine with existing UDFs.
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org> wrote:
> > >
> > > > Hi Guowei,
> > > >
> > > > I think the whole discussion here started as a means to avoid
> exposing
> > > > MailboxExecutor to the user. Your preferred way would be to improve
> > > AsyncIO
> > > > to support batching or implement AsyncSink as batching+AsyncIO.  Here
> > are
> > > > some thoughts.
> > > >
> > > > 1) We should take a step back and note that we actually already
> expose
> > > > MailboxExecutor on the Operator API in particular to implement
> AsyncIO.
> > > > (Now, we could say that we actually want to hide Operator API in the
> > > future
> > > > and could make MailboxExecutor internal again)
> > > >
> > > > 2) Exposing MailboxExecutor in general is a means for users to access
> > the
> > > > task thread in a cooperative way. That would allow certain
> > communication
> > > > patterns beyond simply async requests. For example, the user could
> > > > re-enqueue retries or even effectively block input processing to
> induce
> > > > backpressure by enqueuing a waiting mail. So the main question is if
> we
> > > > want to empower sink devs to access the task thread or not. Note that
> > > > indirectly, sink devs have that option already through timers. So in
> > > > theory, they could also enqueue a MIN_TIME timer and use that to
> > > implement
> > > > any kind of async processing.
> > > >
> > > > The main question here is what do you think is the harm of exposing
> > > > Mailbox? Is it the complexity or the maintenance overhead?
> > > >
> > > > 3) Simulating AsyncSink through AsyncIO is possible but has some
> > > > downsides.
> > > > a) AsyncIO has no user state, so we would be quite limited in
> > > implementing
> > > > at-least-once sinks especially when it comes to retries. Chaining two
> > > > AsyncIO would make it even harder to reason about the built-in state.
> > We
> > > > would also give up any chance to implement exactly once async sinks
> > (even
> > > > though I'm not sure if it's possible at all).
> > > > b) Users will have a hard time to discover SinkUtil.sinkTo compared
> to
> > > the
> > > > expected stream.sinkTo. We have seen that on other occasions already
> > > (e.g.,
> > > > reinterpretAsKeyedStream).
> > > > c) AsyncIO is optimized to feed back the result into the main task
> > > thread.
> > > > That's completely unneeded for sinks.
> > > > d) You probably know it better than me, but imho the batch behavior
> of
> > a
> > > > proper sink would be much better than an AsyncIO simulation (I have
> not
> > > > tracked the latest discussion in FLIP-147).
> > > >
> > > > Note that if we absolutely don't want to expose MailboxExecutor, we
> can
> > > > also try to get it through some casts to internal interfaces. So the
> > > > Sink.InitContext could have a Sink.InitContextInternal subinterface
> > that
> > > > includes the mailbox executor. In AsyncSink, we could cast to the
> > > internal
> > > > interface and are the only ones that can access the MailboxExecutor
> > > > legimately (of course, connector devs could do the same cast but then
> > use
> > > > an internal class and need to expect breaking changes).
> > > >
> > > > For your question:
> > > >
> > > >> 2. By the way, I have a little question. Why not directly reduce the
> > > >> queue size to control the in-flight query, for example, according to
> > > your
> > > >> example,
> > > >> Is a queue size such as 150 enough? In fact, there are caches in
> many
> > > >> places in the entire topology, such as buffers on the network stack.
> > > >>
> > > > It's a bit different. Let's take kinesis as an example. The sink
> > collects
> > > > 500 elements and puts them in a single request (500 is max number of
> > > > records per batch request). Now it sends the request out. At the same
> > > time,
> > > > the next 500 elements are collected. So the in-flight query size
> refers
> > > to
> > > > the number of parallel requets (500 elements each).
> > > > If we first batch 500*numberOfInFlightRequests, and then send out all
> > > > numberOfInFlightRequests at the same time, we get worse latency.
> > > >
> > > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com>
> > wrote:
> > > >
> > > >> Hi, Steffen
> > > >>
> > > >> Thank you for your detailed explanation.
> > > >>
> > > >> >>>But whether a sink is overloaded not only depends on the queue
> > size.
> > > >> It also depends on the number of in-flight async requests
> > > >>
> > > >> 1. How about chaining two AsyncIOs? One is for controlling the size
> of
> > > >> the buffer elements; The other is for controlling the in-flight
> async
> > > >> requests. I think this way might do it and would not need to expose
> > the
> > > >> MailboxExecutor.
> > > >> 2. By the way, I have a little question. Why not directly reduce the
> > > >> queue size to control the in-flight query, for example, according to
> > > your
> > > >> example,
> > > >> Is a queue size such as 150 enough? In fact, there are caches in
> many
> > > >> places in the entire topology, such as buffers on the network stack.
> > > >>
> > > >> >>> I’m not sure whether I’m understanding who you are referring to
> by
> > > >> user.
> > > >> Personally I mean the sink developer.
> > > >>
> > > >>
> > > >> Best,
> > > >> Guowei
> > > >>
> > > >>
> > > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen
> > > >> <shau...@amazon.de.invalid> wrote:
> > > >>
> > > >>> Hey,
> > > >>>
> > > >>> We are using the `MailboxExecutor` to block calls to `write` in
> case
> > > the
> > > >>> sink is somehow overloaded. Overloaded basically means that the
> sink
> > > cannot
> > > >>> persist messages quickly enough into the respective destination.
> > > >>>
> > > >>> But whether a sink is overloaded not only depends on the queue
> size.
> > It
> > > >>> also depends on the number of in-flight async requests that must
> not
> > > grow
> > > >>> too large either [1, 2]. We also need to support use cases where
> the
> > > >>> destination can only ingest x messages per second or a total
> > > throughput of
> > > >>> y per second. We are also planning to support time outs so that
> data
> > is
> > > >>> persisted into the destination at least every n seconds by means of
> > the
> > > >>> `ProcessingTimeService`. The batching configuration will be part of
> > the
> > > >>> constructor, which has only been indicated in the current prototype
> > > but is
> > > >>> not implemented, yet [3].
> > > >>>
> > > >>> I’m not sure whether I’m understanding who you are referring to by
> > > user.
> > > >>> People who are using a concrete sink, eg, to send messages into a
> > > Kinesis
> > > >>> stream, will not be exposed to the `MailboxExecutor`. They are just
> > > using
> > > >>> the sink and pass in the batching configuration from above [4]. The
> > > >>> `MailboxExecutor` and `ProcessingTimeService` are only relevant for
> > > sink
> > > >>> authors who want to create support for a new destination. I would
> > > expect
> > > >>> that there are only few experts who are adding support for new
> > > >>> destinations, who are capable to understand and use the advanced
> > > constructs
> > > >>> properly.
> > > >>>
> > > >>> Hope that helps to clarify our thinking.
> > > >>>
> > > >>> Cheers, Steffen
> > > >>>
> > > >>>
> > > >>> [1]
> > > >>>
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
> > > >>> [2]
> > > >>>
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
> > > >>> [3]
> > > >>>
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
> > > >>> [4]
> > > >>>
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
> > > >>>
> > > >>>
> > > >>>
> > > >>> From: Arvid Heise <ar...@apache.org>
> > > >>> Date: Tuesday, 20. July 2021 at 23:03
> > > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann <
> shau...@amazon.de>
> > > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
> > > >>>
> > > >>>
> > > >>> CAUTION: This email originated from outside of the organization. Do
> > not
> > > >>> click links or open attachments unless you can confirm the sender
> and
> > > know
> > > >>> the content is safe.
> > > >>>
> > > >>>
> > > >>> Hi Guowei,
> > > >>>
> > > >>> 1. your idea is quite similar to FLIP-171 [1]. The question is if
> we
> > > >>> implement FLIP-171 based on public interfaces (which would require
> > > exposing
> > > >>> MailboxExecutor as described here in FLIP-177) or if it's better to
> > > >>> implement it internally and hide it.
> > > >>> The first option is an abstract base class; your second option
> would
> > be
> > > >>> an abstract interface that has matching implementation internally
> > > >>> (similarly to AsyncIO).
> > > >>> There is an example for option 1 in [2]; I think the idea was to
> > > >>> additionally specify the batch size and batch timeout in the ctor.
> > > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
> > > >>>
> > > >>> 2. I guess your question is if current AsyncIO is not sufficient
> > > already
> > > >>> if exactly-once is not needed? The answer is currently no, because
> > > AsyncIO
> > > >>> is not doing any batching. The user could do batching before that
> but
> > > >>> that's quite a bit of code. However, we should really think if
> > AsyncIO
> > > >>> should also support batching.
> > > >>> I would also say that the scope of AsyncIO and AsyncSink is quite
> > > >>> different: the first one is for application developers and the
> second
> > > one
> > > >>> is for connector developers and would be deemed an implementation
> > > detail by
> > > >>> the application developer. Of course, advanced users may fall in
> both
> > > >>> categories, so the distinction does not always hold.
> > > >>>
> > > >>> Nevertheless, there is some overlap between both approaches and
> it's
> > > >>> important to think if the added complexity warrants the benefit. It
> > > would
> > > >>> be interesting to hear how other devs see that.
> > > >>>
> > > >>> [1]
> > > >>>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > > >>> [2]
> > > >>>
> > >
> >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
> > > >>>
> > > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com
> > > <mailto:
> > > >>> guowei....@gmail.com>> wrote:
> > > >>> Hi, Avrid
> > > >>> Thank you Avrid for perfecting Sink through this FLIP. I have two
> > > little
> > > >>> questions
> > > >>>
> > > >>> 1. What do you think of us directly providing an interface as
> > follows?
> > > In
> > > >>> this way, there may be no need to expose the Mailbox to the user.
> We
> > > can
> > > >>> implement an `AsyncSinkWriterOperator` to control the length of the
> > > >>> queue.
> > > >>> If it is too long, do not call SinkWriter::write.
> > > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
> > > >>>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
> > > >>> WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture
> > at
> > > >>> first.
> > > >>>     int getQueueLimit();
> > > >>> }
> > > >>>
> > > >>> 2. Another question is: If users don't care about exactly once and
> > the
> > > >>> unification of stream and batch, how about letting users use
> > > >>> `AsyncFunction` directly? I don’t have an answer either. I want to
> > hear
> > > >>> your suggestions.
> > > >>>
> > > >>> Best,
> > > >>> Guowei
> > > >>>
> > > >>>
> > > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org
> > <mailto:
> > > >>> ar...@apache.org>> wrote:
> > > >>>
> > > >>> > Dear devs,
> > > >>> >
> > > >>> > today I'd like to start the discussion on the Sink API. I have
> > > drafted
> > > >>> a
> > > >>> > FLIP [1] with an accompanying PR [2].
> > > >>> >
> > > >>> > This FLIP is a bit special as it's actually a few smaller
> > Amend-FLIPs
> > > >>> in
> > > >>> > one. In this discussion, we should decide on the scope and cut
> out
> > > too
> > > >>> > invasive steps if we can't reach an agreement.
> > > >>> >
> > > >>> > Step 1 is to add a few more pieces of information to context
> > objects.
> > > >>> > That's non-breaking and needed for the async communication
> pattern
> > in
> > > >>> > FLIP-171 [3]. While we need to add a new Public API
> > > (MailboxExecutor),
> > > >>> I
> > > >>> > think that this should entail the least discussions.
> > > >>> >
> > > >>> > Step 2 is to also offer the same context information to
> committers.
> > > >>> Here we
> > > >>> > can offer some compatibility methods to not break existing sinks.
> > The
> > > >>> main
> > > >>> > use case would be some async exactly-once sink but I'm not sure
> if
> > we
> > > >>> would
> > > >>> > use async communication patterns at all here (or simply wait for
> > all
> > > >>> async
> > > >>> > requests to finish in a sync way). It may also help with async
> > > cleanup
> > > >>> > tasks though.
> > > >>> >
> > > >>> > While drafting Step 2, I noticed the big entanglement of the
> > current
> > > >>> API.
> > > >>> > To figure out if there is a committer during the stream graph
> > > >>> creation, we
> > > >>> > actually need to create a committer which can have unforeseen
> > > >>> consequences.
> > > >>> > Thus, I spiked if we can disentangle the interface and have
> > separate
> > > >>> > interfaces for the different use cases. The resulting step 3
> would
> > > be a
> > > >>> > completely breaking change and thus is probably controversial.
> > > >>> However, I'd
> > > >>> > also see the disentanglement as a way to prepare to make Sinks
> more
> > > >>> > expressive (write and commit coordinator) without completely
> > > >>> overloading
> > > >>> > the main interface.
> > > >>> >
> > > >>> > [1]
> > > >>> >
> > > >>> >
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> > > >>> > [2] https://github.com/apache/flink/pull/16399
> > > >>> > [3]
> > > >>> >
> > > >>>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > > >>> >
> > > >>>
> > > >>>
> > > >>>
> > > >>> Amazon Web Services EMEA SARL
> > > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg
> > > >>> Sitz der Gesellschaft: L-1855 Luxemburg
> > > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
> > > >>>
> > > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> > > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen
> > > >>> Sitz der Zweigniederlassung: Muenchen
> > > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> > > >>> 242240, USt-ID DE317013094
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > >
> >
>

Reply via email to