Hi Arvid

In fact, I think the way of combination is more flexible, especially in
terms of reuse. Once you make changes, you can reuse the last logic well.
For example, two AsyncIOA+AsyncIOB, you can easily reuse the previous logic
AsyncIOA+AsyncIOC,
For compatibility, I don't think there would be a big problem. For example,
for the old AsyncIOA+AsyncIOB, we can always convert to
AsyncIOA+AsyncIOB+AsyncIOC. When AsyncIOB does not have a state, simply
pass it through.

Hi Steffen
You are right, there are indeed such examples. However, I also mentioned in
my previous email that we could extend the AsyncIO capability ----
XXXFuture.fail(Excelption) so that AsyncIO can re-try(put the failed
element back to the internal queue again), which will also solve the
problem you mentioned. BTW if we implement `AsyncSink` separately, there
may also be Order/UnOrder issues, which is already resolved by the `AsyncIO`

In general, I think AsyncIO and AsyncSink are very similar. There might be
some duplicated work.

Best,
Guowei


On Fri, Jul 30, 2021 at 8:16 PM Hausmann, Steffen <shau...@amazon.de.invalid>
wrote:

> Hey Guowei,
>
> there is one additional aspect I want to highlight that is relevant for
> the types of destinations we had in mind when designing the AsyncSink.
>
> I'll again use Kinesis as an example, but the same argument applies to
> other destinations. We are using the PutRecords API to persist up to 500
> events with a single API call to reduce the overhead compared to using
> individual calls per event. But not all of the 500 events may be persisted
> successfully, eg, a single event fails to be persisted due to server side
> throttling. With the MailboxExecutor based implementation, we can just add
> this event back to the internal queue. The event will then be retied with
> the next batch of 500 events.
>
> In my understanding, that's not possible with the AsyncIO based approach.
> During a retry, we can only retry the failed events of the original batch
> of events, which means we would need to send a single event with a separate
> PutRecords call. Depending how often that happens, this could add up.
>
> Does that make sense?
>
> Cheers, Steffen
>
>
> On 30.07.21, 05:51, "Guowei Ma" <guowei....@gmail.com> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Hi, Arvid & Piotr
>     Sorry for the late reply.
>     1. Thank you all very much for your patience and explanation.
> Recently, I
>     have also studied the related code of 'MailBox', which may not be as
>     serious as I thought, after all, it is very similar to Java's
> `Executor`;
>     2. Whether to use AsyncIO or MailBox to implement Kinesis connector is
> more
>     up to the contributor to decide (after all, `Mailbox` has decided to be
>     exposed :-) ). It’s just that I personally prefer to combine some
> simple
>     functions to complete a more advanced function.
>     Best,
>     Guowei
>
>
>     On Sat, Jul 24, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote:
>
>     > Just to reiterate on Piotr's point: MailboxExecutor is pretty much an
>     > Executor [1] with named lambdas, except for the name MailboxExecutor
>     > nothing is hinting at a specific threading model.
>     >
>     > Currently, we expose it on StreamOperator API. Afaik the idea is to
> make
>     > the StreamOperator internal and beef up ProcessFunction but for
> several use
>     > cases (e.g., AsyncIO), we actually need to expose the executor
> anyways.
>     >
>     > We could rename MailboxExecutor to avoid exposing the name of the
> threading
>     > model. For example, we could rename it to TaskThreadExecutor (but
> that's
>     > pretty much the same), to CooperativeExecutor (again implies
> Mailbox), to
>     > o.a.f.Executor, to DeferredExecutor... Ideas are welcome.
>     >
>     > We could also simply use Java's Executor interface, however, when
> working
>     > with that interface, I found that the missing context of async
> executed
>     > lambdas made debugging much much harder. So that's why I designed
>     > MailboxExecutor to force the user to give some debug string to each
>     > invokation. In the sink context, we could, however, use an adaptor
> from
>     > MailboxExecutor to Java's Executor and simply bind the sink name to
> the
>     > invokations.
>     >
>     > [1]
>     >
>     >
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
>     >
>     > On Fri, Jul 23, 2021 at 5:36 PM Piotr Nowojski <pnowoj...@apache.org
> >
>     > wrote:
>     >
>     > > Hi,
>     > >
>     > > Regarding the question whether to expose the MailboxExecutor or
> not:
>     > > 1. We have plans on exposing it in the ProcessFunction (in short
> we want
>     > to
>     > > make StreamOperator API private/internal only, and move all of
> it's extra
>     > > functionality in one way or another to the ProcessFunction). I
> don't
>     > > remember and I'm not sure if *Dawid* had a different idea about
> this (do
>     > > not expose Mailbox but wrap it somehow?)
>     > > 2. If we provide a thin wrapper around MailboxExecutor, I'm not
> sure how
>     > > helpful it will be for keeping backward compatibility in the
> future.
>     > > `MailboxExecutor` is already a very generic interface that doesn't
> expose
>     > > much about the current threading model. Note that the previous
> threading
>     > > model (multi threaded with checkpoint lock), should be easy to
> implement
>     > > using the `MailboxExecutor` interface (use a direct executor that
>     > acquires
>     > > checkpoint lock).
>     > >
>     > > Having said that, I haven't spent too much time thinking about
> whether
>     > it's
>     > > better to enrich AsyncIO or provide the AsyncSink. If we can just
> as
>     > > efficiently provide the same functionality using the
> existing/enhanced
>     > > AsyncIO API, that may be a good idea if it indeed reduces our
>     > > maintenance costs.
>     > >
>     > > Piotrek
>     > >
>     > > pt., 23 lip 2021 o 12:55 Guowei Ma <guowei....@gmail.com>
> napisał(a):
>     > >
>     > > > Hi, Arvid
>     > > >
>     > > > >>>The main question here is what do you think is the harm of
> exposing
>     > > > Mailbox? Is it the complexity or the maintenance overhead?
>     > > >
>     > > > I think that exposing the internal threading model might be
> risky. In
>     > > case
>     > > > the threading model changes, it will affect the user's api and
> bring
>     > the
>     > > > burden of internal modification. (Of course, you may have more
> say in
>     > how
>     > > > the MailBox model will develop in the future) Therefore, I think
> that
>     > if
>     > > an
>     > > > alternative solution can be found, these possible risks will be
>     > avoided:
>     > > > for example, through AsyncIO.
>     > > >
>     > > > >>>>AsyncIO has no user state, so we would be quite limited in
>     > > implementing
>     > > > at-least-once sinks especially when it comes to retries.
> Chaining two
>     > > > AsyncIO would make it even harder to reason about the built-in
> state.
>     > We
>     > > > would also give up any chance to implement exactly once async
> sinks
>     > (even
>     > > > though I'm not sure if it's possible at all).
>     > > >
>     > > > 1. Why would we need to use the state when retrying(maybe I miss
>     > > > something)? If a batch of asynchronous requests fails, I think
> it is
>     > > enough
>     > > > to retry directly in the callback. Or extend AsyncIO to give it
> the
>     > > ability
>     > > > to retry(XXXFuture.fail (Excelption)); in addition, in general,
> the
>     > > client
>     > > > has its own retry mechanism, at least the producer of Kineses
> said in
>     > the
>     > > > document. Of course I am not opposed to retrying, I just want to
> find a
>     > > > more obvious example to support the need to do so.
>     > > >
>     > > > 2. I don't think using AsyncIO will prevent exactly once in the
> future.
>     > > > Both solutions need to be rewritten unless Exactly Once is
> required
>     > from
>     > > > the beginning.
>     > > >
>     > > > >>>Users will have a hard time to discover SinkUtil.sinkTo
> compared to
>     > > the
>     > > > expected stream.sinkTo. We have seen that on other occasions
> already
>     > > (e.g.,
>     > > > reinterpretAsKeyedStream).
>     > > > In fact, I think this is the most important issue. We lack the
> function
>     > > of
>     > > > supporting sub-topology at the API layer, which is very
> inconvenient.
>     > For
>     > > > examplestream.sinkTo(AsyncSinkTopoloyBuilder), what do you think?
>     > > > ```java
>     > > > AsyncSinkTopologyBuildrer {
>     > > > void build(inputstream) {
>     > > > input.flatmap().async()…
>     > > > }
>     > > > ```
>     > > > In general, I want to know what principles will guide when to
> expose
>     > more
>     > > > implementations to users, and when to combine with existing UDFs.
>     > > >
>     > > > Best,
>     > > > Guowei
>     > > >
>     > > >
>     > > > On Thu, Jul 22, 2021 at 10:35 PM Arvid Heise <ar...@apache.org>
> wrote:
>     > > >
>     > > > > Hi Guowei,
>     > > > >
>     > > > > I think the whole discussion here started as a means to avoid
>     > exposing
>     > > > > MailboxExecutor to the user. Your preferred way would be to
> improve
>     > > > AsyncIO
>     > > > > to support batching or implement AsyncSink as
> batching+AsyncIO.  Here
>     > > are
>     > > > > some thoughts.
>     > > > >
>     > > > > 1) We should take a step back and note that we actually already
>     > expose
>     > > > > MailboxExecutor on the Operator API in particular to implement
>     > AsyncIO.
>     > > > > (Now, we could say that we actually want to hide Operator API
> in the
>     > > > future
>     > > > > and could make MailboxExecutor internal again)
>     > > > >
>     > > > > 2) Exposing MailboxExecutor in general is a means for users to
> access
>     > > the
>     > > > > task thread in a cooperative way. That would allow certain
>     > > communication
>     > > > > patterns beyond simply async requests. For example, the user
> could
>     > > > > re-enqueue retries or even effectively block input processing
> to
>     > induce
>     > > > > backpressure by enqueuing a waiting mail. So the main question
> is if
>     > we
>     > > > > want to empower sink devs to access the task thread or not.
> Note that
>     > > > > indirectly, sink devs have that option already through timers.
> So in
>     > > > > theory, they could also enqueue a MIN_TIME timer and use that
> to
>     > > > implement
>     > > > > any kind of async processing.
>     > > > >
>     > > > > The main question here is what do you think is the harm of
> exposing
>     > > > > Mailbox? Is it the complexity or the maintenance overhead?
>     > > > >
>     > > > > 3) Simulating AsyncSink through AsyncIO is possible but has
> some
>     > > > > downsides.
>     > > > > a) AsyncIO has no user state, so we would be quite limited in
>     > > > implementing
>     > > > > at-least-once sinks especially when it comes to retries.
> Chaining two
>     > > > > AsyncIO would make it even harder to reason about the built-in
> state.
>     > > We
>     > > > > would also give up any chance to implement exactly once async
> sinks
>     > > (even
>     > > > > though I'm not sure if it's possible at all).
>     > > > > b) Users will have a hard time to discover SinkUtil.sinkTo
> compared
>     > to
>     > > > the
>     > > > > expected stream.sinkTo. We have seen that on other occasions
> already
>     > > > (e.g.,
>     > > > > reinterpretAsKeyedStream).
>     > > > > c) AsyncIO is optimized to feed back the result into the main
> task
>     > > > thread.
>     > > > > That's completely unneeded for sinks.
>     > > > > d) You probably know it better than me, but imho the batch
> behavior
>     > of
>     > > a
>     > > > > proper sink would be much better than an AsyncIO simulation (I
> have
>     > not
>     > > > > tracked the latest discussion in FLIP-147).
>     > > > >
>     > > > > Note that if we absolutely don't want to expose
> MailboxExecutor, we
>     > can
>     > > > > also try to get it through some casts to internal interfaces.
> So the
>     > > > > Sink.InitContext could have a Sink.InitContextInternal
> subinterface
>     > > that
>     > > > > includes the mailbox executor. In AsyncSink, we could cast to
> the
>     > > > internal
>     > > > > interface and are the only ones that can access the
> MailboxExecutor
>     > > > > legimately (of course, connector devs could do the same cast
> but then
>     > > use
>     > > > > an internal class and need to expect breaking changes).
>     > > > >
>     > > > > For your question:
>     > > > >
>     > > > >> 2. By the way, I have a little question. Why not directly
> reduce the
>     > > > >> queue size to control the in-flight query, for example,
> according to
>     > > > your
>     > > > >> example,
>     > > > >> Is a queue size such as 150 enough? In fact, there are caches
> in
>     > many
>     > > > >> places in the entire topology, such as buffers on the network
> stack.
>     > > > >>
>     > > > > It's a bit different. Let's take kinesis as an example. The
> sink
>     > > collects
>     > > > > 500 elements and puts them in a single request (500 is max
> number of
>     > > > > records per batch request). Now it sends the request out. At
> the same
>     > > > time,
>     > > > > the next 500 elements are collected. So the in-flight query
> size
>     > refers
>     > > > to
>     > > > > the number of parallel requets (500 elements each).
>     > > > > If we first batch 500*numberOfInFlightRequests, and then send
> out all
>     > > > > numberOfInFlightRequests at the same time, we get worse
> latency.
>     > > > >
>     > > > > On Thu, Jul 22, 2021 at 12:11 PM Guowei Ma <
> guowei....@gmail.com>
>     > > wrote:
>     > > > >
>     > > > >> Hi, Steffen
>     > > > >>
>     > > > >> Thank you for your detailed explanation.
>     > > > >>
>     > > > >> >>>But whether a sink is overloaded not only depends on the
> queue
>     > > size.
>     > > > >> It also depends on the number of in-flight async requests
>     > > > >>
>     > > > >> 1. How about chaining two AsyncIOs? One is for controlling
> the size
>     > of
>     > > > >> the buffer elements; The other is for controlling the
> in-flight
>     > async
>     > > > >> requests. I think this way might do it and would not need to
> expose
>     > > the
>     > > > >> MailboxExecutor.
>     > > > >> 2. By the way, I have a little question. Why not directly
> reduce the
>     > > > >> queue size to control the in-flight query, for example,
> according to
>     > > > your
>     > > > >> example,
>     > > > >> Is a queue size such as 150 enough? In fact, there are caches
> in
>     > many
>     > > > >> places in the entire topology, such as buffers on the network
> stack.
>     > > > >>
>     > > > >> >>> I’m not sure whether I’m understanding who you are
> referring to
>     > by
>     > > > >> user.
>     > > > >> Personally I mean the sink developer.
>     > > > >>
>     > > > >>
>     > > > >> Best,
>     > > > >> Guowei
>     > > > >>
>     > > > >>
>     > > > >> On Thu, Jul 22, 2021 at 4:40 PM Hausmann, Steffen
>     > > > >> <shau...@amazon.de.invalid> wrote:
>     > > > >>
>     > > > >>> Hey,
>     > > > >>>
>     > > > >>> We are using the `MailboxExecutor` to block calls to `write`
> in
>     > case
>     > > > the
>     > > > >>> sink is somehow overloaded. Overloaded basically means that
> the
>     > sink
>     > > > cannot
>     > > > >>> persist messages quickly enough into the respective
> destination.
>     > > > >>>
>     > > > >>> But whether a sink is overloaded not only depends on the
> queue
>     > size.
>     > > It
>     > > > >>> also depends on the number of in-flight async requests that
> must
>     > not
>     > > > grow
>     > > > >>> too large either [1, 2]. We also need to support use cases
> where
>     > the
>     > > > >>> destination can only ingest x messages per second or a total
>     > > > throughput of
>     > > > >>> y per second. We are also planning to support time outs so
> that
>     > data
>     > > is
>     > > > >>> persisted into the destination at least every n seconds by
> means of
>     > > the
>     > > > >>> `ProcessingTimeService`. The batching configuration will be
> part of
>     > > the
>     > > > >>> constructor, which has only been indicated in the current
> prototype
>     > > > but is
>     > > > >>> not implemented, yet [3].
>     > > > >>>
>     > > > >>> I’m not sure whether I’m understanding who you are referring
> to by
>     > > > user.
>     > > > >>> People who are using a concrete sink, eg, to send messages
> into a
>     > > > Kinesis
>     > > > >>> stream, will not be exposed to the `MailboxExecutor`. They
> are just
>     > > > using
>     > > > >>> the sink and pass in the batching configuration from above
> [4]. The
>     > > > >>> `MailboxExecutor` and `ProcessingTimeService` are only
> relevant for
>     > > > sink
>     > > > >>> authors who want to create support for a new destination. I
> would
>     > > > expect
>     > > > >>> that there are only few experts who are adding support for
> new
>     > > > >>> destinations, who are capable to understand and use the
> advanced
>     > > > constructs
>     > > > >>> properly.
>     > > > >>>
>     > > > >>> Hope that helps to clarify our thinking.
>     > > > >>>
>     > > > >>> Cheers, Steffen
>     > > > >>>
>     > > > >>>
>     > > > >>> [1]
>     > > > >>>
>     > > >
>     > >
>     >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L118
>     > > > >>> [2]
>     > > > >>>
>     > > >
>     > >
>     >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L155
>     > > > >>> [3]
>     > > > >>>
>     > > >
>     > >
>     >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L43-L49
>     > > > >>> [4]
>     > > > >>>
>     > > >
>     > >
>     >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java#L45
>     > > > >>>
>     > > > >>>
>     > > > >>>
>     > > > >>> From: Arvid Heise <ar...@apache.org>
>     > > > >>> Date: Tuesday, 20. July 2021 at 23:03
>     > > > >>> To: dev <dev@flink.apache.org>, Steffen Hausmann <
>     > shau...@amazon.de>
>     > > > >>> Subject: RE: [EXTERNAL] [DISCUSS] FLIP-177: Extend Sink API
>     > > > >>>
>     > > > >>>
>     > > > >>> CAUTION: This email originated from outside of the
> organization. Do
>     > > not
>     > > > >>> click links or open attachments unless you can confirm the
> sender
>     > and
>     > > > know
>     > > > >>> the content is safe.
>     > > > >>>
>     > > > >>>
>     > > > >>> Hi Guowei,
>     > > > >>>
>     > > > >>> 1. your idea is quite similar to FLIP-171 [1]. The question
> is if
>     > we
>     > > > >>> implement FLIP-171 based on public interfaces (which would
> require
>     > > > exposing
>     > > > >>> MailboxExecutor as described here in FLIP-177) or if it's
> better to
>     > > > >>> implement it internally and hide it.
>     > > > >>> The first option is an abstract base class; your second
> option
>     > would
>     > > be
>     > > > >>> an abstract interface that has matching implementation
> internally
>     > > > >>> (similarly to AsyncIO).
>     > > > >>> There is an example for option 1 in [2]; I think the idea
> was to
>     > > > >>> additionally specify the batch size and batch timeout in the
> ctor.
>     > > > >>> @Hausmann, Steffen<mailto:shau...@amazon.de> knows more.
>     > > > >>>
>     > > > >>> 2. I guess your question is if current AsyncIO is not
> sufficient
>     > > > already
>     > > > >>> if exactly-once is not needed? The answer is currently no,
> because
>     > > > AsyncIO
>     > > > >>> is not doing any batching. The user could do batching before
> that
>     > but
>     > > > >>> that's quite a bit of code. However, we should really think
> if
>     > > AsyncIO
>     > > > >>> should also support batching.
>     > > > >>> I would also say that the scope of AsyncIO and AsyncSink is
> quite
>     > > > >>> different: the first one is for application developers and
> the
>     > second
>     > > > one
>     > > > >>> is for connector developers and would be deemed an
> implementation
>     > > > detail by
>     > > > >>> the application developer. Of course, advanced users may
> fall in
>     > both
>     > > > >>> categories, so the distinction does not always hold.
>     > > > >>>
>     > > > >>> Nevertheless, there is some overlap between both approaches
> and
>     > it's
>     > > > >>> important to think if the added complexity warrants the
> benefit. It
>     > > > would
>     > > > >>> be interesting to hear how other devs see that.
>     > > > >>>
>     > > > >>> [1]
>     > > > >>>
>     > > >
>     >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>     > > > >>> [2]
>     > > > >>>
>     > > >
>     > >
>     >
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>     > > > >>>
>     > > > >>> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <
> guowei....@gmail.com
>     > > > <mailto:
>     > > > >>> guowei....@gmail.com>> wrote:
>     > > > >>> Hi, Avrid
>     > > > >>> Thank you Avrid for perfecting Sink through this FLIP. I
> have two
>     > > > little
>     > > > >>> questions
>     > > > >>>
>     > > > >>> 1. What do you think of us directly providing an interface as
>     > > follows?
>     > > > In
>     > > > >>> this way, there may be no need to expose the Mailbox to the
> user.
>     > We
>     > > > can
>     > > > >>> implement an `AsyncSinkWriterOperator` to control the length
> of the
>     > > > >>> queue.
>     > > > >>> If it is too long, do not call SinkWriter::write.
>     > > > >>> public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
>     > > > >>>         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>,
> CommT,
>     > > > >>> WriterStateT> { //// please ignore the name of Tuple2 and
> XXXFuture
>     > > at
>     > > > >>> first.
>     > > > >>>     int getQueueLimit();
>     > > > >>> }
>     > > > >>>
>     > > > >>> 2. Another question is: If users don't care about exactly
> once and
>     > > the
>     > > > >>> unification of stream and batch, how about letting users use
>     > > > >>> `AsyncFunction` directly? I don’t have an answer either. I
> want to
>     > > hear
>     > > > >>> your suggestions.
>     > > > >>>
>     > > > >>> Best,
>     > > > >>> Guowei
>     > > > >>>
>     > > > >>>
>     > > > >>> On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <
> ar...@apache.org
>     > > <mailto:
>     > > > >>> ar...@apache.org>> wrote:
>     > > > >>>
>     > > > >>> > Dear devs,
>     > > > >>> >
>     > > > >>> > today I'd like to start the discussion on the Sink API. I
> have
>     > > > drafted
>     > > > >>> a
>     > > > >>> > FLIP [1] with an accompanying PR [2].
>     > > > >>> >
>     > > > >>> > This FLIP is a bit special as it's actually a few smaller
>     > > Amend-FLIPs
>     > > > >>> in
>     > > > >>> > one. In this discussion, we should decide on the scope and
> cut
>     > out
>     > > > too
>     > > > >>> > invasive steps if we can't reach an agreement.
>     > > > >>> >
>     > > > >>> > Step 1 is to add a few more pieces of information to
> context
>     > > objects.
>     > > > >>> > That's non-breaking and needed for the async communication
>     > pattern
>     > > in
>     > > > >>> > FLIP-171 [3]. While we need to add a new Public API
>     > > > (MailboxExecutor),
>     > > > >>> I
>     > > > >>> > think that this should entail the least discussions.
>     > > > >>> >
>     > > > >>> > Step 2 is to also offer the same context information to
>     > committers.
>     > > > >>> Here we
>     > > > >>> > can offer some compatibility methods to not break existing
> sinks.
>     > > The
>     > > > >>> main
>     > > > >>> > use case would be some async exactly-once sink but I'm not
> sure
>     > if
>     > > we
>     > > > >>> would
>     > > > >>> > use async communication patterns at all here (or simply
> wait for
>     > > all
>     > > > >>> async
>     > > > >>> > requests to finish in a sync way). It may also help with
> async
>     > > > cleanup
>     > > > >>> > tasks though.
>     > > > >>> >
>     > > > >>> > While drafting Step 2, I noticed the big entanglement of
> the
>     > > current
>     > > > >>> API.
>     > > > >>> > To figure out if there is a committer during the stream
> graph
>     > > > >>> creation, we
>     > > > >>> > actually need to create a committer which can have
> unforeseen
>     > > > >>> consequences.
>     > > > >>> > Thus, I spiked if we can disentangle the interface and have
>     > > separate
>     > > > >>> > interfaces for the different use cases. The resulting step
> 3
>     > would
>     > > > be a
>     > > > >>> > completely breaking change and thus is probably
> controversial.
>     > > > >>> However, I'd
>     > > > >>> > also see the disentanglement as a way to prepare to make
> Sinks
>     > more
>     > > > >>> > expressive (write and commit coordinator) without
> completely
>     > > > >>> overloading
>     > > > >>> > the main interface.
>     > > > >>> >
>     > > > >>> > [1]
>     > > > >>> >
>     > > > >>> >
>     > > > >>>
>     > > >
>     > >
>     >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
>     > > > >>> > [2] https://github.com/apache/flink/pull/16399
>     > > > >>> > [3]
>     > > > >>> >
>     > > > >>>
>     > > >
>     >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>     > > > >>> >
>     > > > >>>
>     > > > >>>
>     > > > >>>
>     > > > >>> Amazon Web Services EMEA SARL
>     > > > >>> 38 avenue John F. Kennedy, L-1855 Luxembourg
>     > > > >>> Sitz der Gesellschaft: L-1855 Luxemburg
>     > > > >>> eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
>     > > > >>>
>     > > > >>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     > > > >>> Marcel-Breuer-Str. 12, D-80807 Muenchen
>     > > > >>> Sitz der Zweigniederlassung: Muenchen
>     > > > >>> eingetragen im Handelsregister des Amtsgerichts Muenchen
> unter HRB
>     > > > >>> 242240, USt-ID DE317013094
>     > > > >>>
>     > > > >>>
>     > > > >>>
>     > > > >>>
>     > > >
>     > >
>     >
>
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>

Reply via email to