Hi,

Regarding the question whether to expose the MailboxExecutor or not:
1. We have plans on exposing it in the ProcessFunction (in short we want to
make StreamOperator API private/internal only, and move all of it's extra
functionality in one way or another to the ProcessFunction). I don't
remember and I'm not sure if *Dawid* had a different idea about this (do
not expose Mailbox but wrap it somehow?)
2. If we provide a thin wrapper around MailboxExecutor, I'm not sure how
helpful it will be for keeping backward compatibility in the future.
`MailboxExecutor` is already a very generic interface that doesn't expose
much about the current threading model. Note that the previous threading
model (multi threaded with checkpoint lock), should be easy to implement
using the `MailboxExecutor` interface (use a direct executor that acquires
checkpoint lock).

Having said that, I haven't spent too much time thinking about whether it's
better to enrich AsyncIO or provide the AsyncSink. If we can just as
efficiently provide the same functionality using the existing/enhanced
AsyncIO API, that may be a good idea if it indeed reduces our
maintenance costs.

Piotrek

pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com> napisał(a):

> Hi, Arvid
>
> >>>The main question here is what do you think is the harm of exposing
> Mailbox? Is it the complexity or the maintenance overhead?
>
> I think that exposing the internal threading model might be risky. In case
> the threading model changes, it will affect the user's api and bring the
> burden of internal modification. (Of course, you may have more say in how
> the MailBox model will develop in the future) Therefore, I think that if an
> alternative solution can be found, these possible risks will be avoided:
> for example, through AsyncIO.
>
> >>>>AsyncIO has no user state, so we would be quite limited in implementing
> at-least-once sinks especially when it comes to retries. Chaining two
> AsyncIO would make it even harder to reason about the built-in state. We
> would also give up any chance to implement exactly once async sinks (even
> though I'm not sure if it's possible at all).
>
> 1. Why would we need to use the state when retrying(maybe I miss
> something)? If a batch of asynchronous requests fails, I think it is enough
> to retry directly in the callback. Or extend AsyncIO to give it the ability
> to retry(XXXFuture.fail (Excelption)); in addition, in general, the client
> has its own retry mechanism, at least the producer of Kineses said in the
> document. Of course I am not opposed to retrying, I just want to find a
> more obvious example to support the need to do so.
>
> 2. I don't think using AsyncIO will prevent exactly once in the future.
> Both solutions need to be rewritten unless Exactly Once is required from
> the beginning.
>
> >>>Users will have a hard time to discover SinkUtil.sinkTo compared to the
> expected stream.sinkTo. We have seen that on other occasions already (e.g.,
> reinterpretAsKeyedStream).
> In fact, I think this is the most important issue. We lack the function of
> supporting sub-topology at the API layer, which is very inconvenient. For
> examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think?
> ```java
> AsyncSinkTopologyBuildrer {
> void build(inputstream) {
> input.flatmap().async()…
> }
> ```
> In general, I want to know what principles will guide when to expose more
> implementations to users, and when to combine with existing UDFs.
>
> Best,
> Guowei
>
>
> On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org> wrote:
>
> > Hi Guowei,
> >
> > I think the whole discussion here started as a means to avoid exposing
> > MailboxExecutor to the user. Your preferred way would be to improve
> AsyncIO
> > to support batching or implement AsyncSink as batching+AsyncIO.  Here are
> > some thoughts.
> >
> > 1) We should take a step back and note that we actually already expose
> > MailboxExecutor on the Operator API in particular to implement AsyncIO.
> > (Now, we could say that we actually want to hide Operator API in the
> future
> > and could make MailboxExecutor internal again)
> >
> > 2) Exposing MailboxExecutor in general is a means for users to access the
> > task thread in a cooperative way. That would allow certain communication
> > patterns beyond simply async requests. For example, the user could
> > re-enqueue retries or even effectively block input processing to induce
> > backpressure by enqueuing a waiting mail. So the main question is if we
> > want to empower sink devs to access the task thread or not. Note that
> > indirectly, sink devs have that option already through timers. So in
> > theory, they could also enqueue a MIN_TIME timer and use that to
> implement
> > any kind of async processing.
> >
> > The main question here is what do you think is the harm of exposing
> > Mailbox? Is it the complexity or the maintenance overhead?
> >
> > 3) Simulating AsyncSink through AsyncIO is possible but has some
> > downsides.
> > a) AsyncIO has no user state, so we would be quite limited in
> implementing
> > at-least-once sinks especially when it comes to retries. Chaining two
> > AsyncIO would make it even harder to reason about the built-in state. We
> > would also give up any chance to implement exactly once async sinks (even
> > though I'm not sure if it's possible at all).
> > b) Users will have a hard time to discover SinkUtil.sinkTo compared to
> the
> > expected stream.sinkTo. We have seen that on other occasions already
> (e.g.,
> > reinterpretAsKeyedStream).
> > c) AsyncIO is optimized to feed back the result into the main task
> thread.
> > That's completely unneeded for sinks.
> > d) You probably know it better than me, but imho the batch behavior of a
> > proper sink would be much better than an AsyncIO simulation (I have not
> > tracked the latest discussion in FLIP-147).
> >
> > Note that if we absolutely don't want to expose MailboxExecutor, we can
> > also try to get it through some casts to internal interfaces. So the
> > Sink.InitContext could have a Sink.InitContextInternal subinterface that
> > includes the mailbox executor. In AsyncSink, we could cast to the
> internal
> > interface and are the only ones that can access the MailboxExecutor
> > legimately (of course, connector devs could do the same cast but then use
> > an internal class and need to expect breaking changes).
> >
> > For your question:
> >
> >> 2. By the way, I have a little question. Why not directly reduce the
> >> queue size to control the in-flight query, for example, according to
> your
> >> example,
> >> Is a queue size such as 150 enough? In fact, there are caches in many
> >> places in the entire topology, such as buffers on the network stack.
> >>
> > It's a bit different. Let's take kinesis as an example. The sink collects
> > 500 elements and puts them in a single request (500 is max number of
> > records per batch request). Now it sends the request out. At the same
> time,
> > the next 500 elements are collected. So the in-flight query size refers
> to
> > the number of parallel requets (500 elements each).
> > If we first batch 500*numberOfInFlightRequests, and then send out all
> > numberOfInFlightRequests at the same time, we get worse latency.
> >
> > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <guowei....@gmail.com> wrote:
> >
> >> Hi, Steffen
> >>
> >> Thank you for your detailed explanation.
> >>
> >> >>>But whether a sink is overloaded not only depends on the queue size.
> >> It also depends on the number of in-flight async requests
> >>
> >> 1. How about chaining two AsyncIOs? One is for controlling the size of
> >> the buffer elements; The other is for controlling the in-flight async
> >> requests. I think this way might do it and would not need to expose the
> >> MailboxExecutor.
> >> 2. By the way, I have a little question. Why not directly reduce the
> >> queue size to control the in-flight query, for example, according to
> your
> >> example,
> >> Is a queue size such as 150 enough? In fact, there are caches in many
> >> places in the entire topology, such as buffers on the network stack.
> >>
> >> >>> I’m not sure whether I’m understanding who you are referring to by
> >> user.
> >> Personally I mean the sink developer.
> >>
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen
> >> <shau...@amazon.de.invalid> wrote:
> >>
> >>> Hey,
> >>>
> >>> We are using the `MailboxExecutor` to block calls to `write` in case
> the
> >>> sink is somehow overloaded. Overloaded basically means that the sink
> cannot
> >>> persist messages quickly enough into the respective destination.
> >>>
> >>> But whether a sink is overloaded not only depends on the queue size. It
> >>> also depends on the number of in-flight async requests that must not
> grow
> >>> too large either [1, 2]. We also need to support use cases where the
> >>> destination can only ingest x messages per second or a total
> throughput of
> >>> y per second. We are also planning to support time outs so that data is
> >>> persisted into the destination at least every n seconds by means of the
> >>> `ProcessingTimeService`. The batching configuration will be part of the
> >>> constructor, which has only been indicated in the current prototype
> but is
> >>> not implemented, yet [3].
> >>>
> >>> I’m not sure whether I’m understanding who you are referring to by
> user.
> >>> People who are using a concrete sink, eg, to send messages into a
> Kinesis
> >>> stream, will not be exposed to the `MailboxExecutor`. They are just
> using
> >>> the sink and pass in the batching configuration from above [4]. The
> >>> `MailboxExecutor` and `ProcessingTimeService` are only relevant for
> sink
> >>> authors who want to create support for a new destination. I would
> expect
> >>> that there are only few experts who are adding support for new
> >>> destinations, who are capable to understand and use the advanced
> constructs
> >>> properly.
> >>>
> >>> Hope that helps to clarify our thinking.
> >>>
> >>> Cheers, Steffen
> >>>
> >>>
> >>> [1]
> >>>
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
> >>> [2]
> >>>
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
> >>> [3]
> >>>
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
> >>> [4]
> >>>
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
> >>>
> >>>
> >>>
> >>> From: Arvid Heise <ar...@apache.org>
> >>> Date: Tuesday, 20. July 2021 at 23:03
> >>> To: dev <dev@flink.apache.org>, Steffen Hausmann <shau...@amazon.de>
> >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
> >>>
> >>>
> >>> CAUTION: This email originated from outside of the organization. Do not
> >>> click links or open attachments unless you can confirm the sender and
> know
> >>> the content is safe.
> >>>
> >>>
> >>> Hi Guowei,
> >>>
> >>> 1. your idea is quite similar to FLIP-171 [1]. The question is if we
> >>> implement FLIP-171 based on public interfaces (which would require
> exposing
> >>> MailboxExecutor as described here in FLIP-177) or if it's better to
> >>> implement it internally and hide it.
> >>> The first option is an abstract base class; your second option would be
> >>> an abstract interface that has matching implementation internally
> >>> (similarly to AsyncIO).
> >>> There is an example for option 1 in [2]; I think the idea was to
> >>> additionally specify the batch size and batch timeout in the ctor.
> >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
> >>>
> >>> 2. I guess your question is if current AsyncIO is not sufficient
> already
> >>> if exactly-once is not needed? The answer is currently no, because
> AsyncIO
> >>> is not doing any batching. The user could do batching before that but
> >>> that's quite a bit of code. However, we should really think if AsyncIO
> >>> should also support batching.
> >>> I would also say that the scope of AsyncIO and AsyncSink is quite
> >>> different: the first one is for application developers and the second
> one
> >>> is for connector developers and would be deemed an implementation
> detail by
> >>> the application developer. Of course, advanced users may fall in both
> >>> categories, so the distinction does not always hold.
> >>>
> >>> Nevertheless, there is some overlap between both approaches and it's
> >>> important to think if the added complexity warrants the benefit. It
> would
> >>> be interesting to hear how other devs see that.
> >>>
> >>> [1]
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> >>> [2]
> >>>
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
> >>>
> >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com
> <mailto:
> >>> guowei....@gmail.com>> wrote:
> >>> Hi, Avrid
> >>> Thank you Avrid for perfecting Sink through this FLIP. I have two
> little
> >>> questions
> >>>
> >>> 1. What do you think of us directly providing an interface as follows?
> In
> >>> this way, there may be no need to expose the Mailbox to the user. We
> can
> >>> implement an `AsyncSinkWriterOperator` to control the length of the
> >>> queue.
> >>> If it is too long, do not call SinkWriter::write.
> >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
> >>>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
> >>> WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at
> >>> first.
> >>>     int getQueueLimit();
> >>> }
> >>>
> >>> 2. Another question is: If users don't care about exactly once and the
> >>> unification of stream and batch, how about letting users use
> >>> `AsyncFunction` directly? I don’t have an answer either. I want to hear
> >>> your suggestions.
> >>>
> >>> Best,
> >>> Guowei
> >>>
> >>>
> >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org<mailto:
> >>> ar...@apache.org>> wrote:
> >>>
> >>> > Dear devs,
> >>> >
> >>> > today I'd like to start the discussion on the Sink API. I have
> drafted
> >>> a
> >>> > FLIP [1] with an accompanying PR [2].
> >>> >
> >>> > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs
> >>> in
> >>> > one. In this discussion, we should decide on the scope and cut out
> too
> >>> > invasive steps if we can't reach an agreement.
> >>> >
> >>> > Step 1 is to add a few more pieces of information to context objects.
> >>> > That's non-breaking and needed for the async communication pattern in
> >>> > FLIP-171 [3]. While we need to add a new Public API
> (MailboxExecutor),
> >>> I
> >>> > think that this should entail the least discussions.
> >>> >
> >>> > Step 2 is to also offer the same context information to committers.
> >>> Here we
> >>> > can offer some compatibility methods to not break existing sinks. The
> >>> main
> >>> > use case would be some async exactly-once sink but I'm not sure if we
> >>> would
> >>> > use async communication patterns at all here (or simply wait for all
> >>> async
> >>> > requests to finish in a sync way). It may also help with async
> cleanup
> >>> > tasks though.
> >>> >
> >>> > While drafting Step 2, I noticed the big entanglement of the current
> >>> API.
> >>> > To figure out if there is a committer during the stream graph
> >>> creation, we
> >>> > actually need to create a committer which can have unforeseen
> >>> consequences.
> >>> > Thus, I spiked if we can disentangle the interface and have separate
> >>> > interfaces for the different use cases. The resulting step 3 would
> be a
> >>> > completely breaking change and thus is probably controversial.
> >>> However, I'd
> >>> > also see the disentanglement as a way to prepare to make Sinks more
> >>> > expressive (write and commit coordinator) without completely
> >>> overloading
> >>> > the main interface.
> >>> >
> >>> > [1]
> >>> >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> >>> > [2] https://github.com/apache/flink/pull/16399
> >>> > [3]
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> >>> >
> >>>
> >>>
> >>>
> >>> Amazon Web Services EMEA SARL
> >>> 38 avenue John F. Kennedy, L-1855 Luxembourg
> >>> Sitz der Gesellschaft: L-1855 Luxemburg
> >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
> >>>
> >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> >>> Marcel-Breuer-Str. 12, D-80807 Muenchen
> >>> Sitz der Zweigniederlassung: Muenchen
> >>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> >>> 242240, USt-ID DE317013094
> >>>
> >>>
> >>>
> >>>
>

Reply via email to