Hi, Arvid

1. You are right that the core difference between the two options is
whether to expose `MailboxExecutor`. My personal preference
 is that the less internal implementations are exposed to users, the better.

2. `AsyncIO` does not support Batch output, but we can implement one based
on `AsyncIO`, such as `AsyncIOBatchSinkBase`. Then we might
provide a `SinkUtil.sinkTo(inputStream, AsyncIOBatchSinkBase)`, which could
help users to build a DataStream topology.
I personally think that if a function could be done externally (for
example, encapsulating an existing UDF to complete a function),
it is better than internal implementation. As for whether it is necessary
to distinguish between a Sink and a non-Sink, I personally don't think it
is particularly important.

```for example
InputStream = blalbalba;
SinkUtil.sinkTo(inputStrean, AsyncIOBasedBatchKensisSink);
```
3. For 2, the more general question is: Is `Sink` an operator or a
topology? If we think that `Sink` is a topology,
then whether `AsyncIO` can be used as `Sink` may not be a problem. We might
think that how we could provide an interface to
help the user to build the sink topology.

Best,
Guowei


On Wed, Jul 21, 2021 at 5:08 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Guowei,
>
> 1. your idea is quite similar to FLIP-171 [1]. The question is if we
> implement FLIP-171 based on public interfaces (which would require exposing
> MailboxExecutor as described here in FLIP-177) or if it's better to
> implement it internally and hide it.
> The first option is an abstract base class; your second option would be an
> abstract interface that has matching implementation internally (similarly
> to AsyncIO).
> There is an example for option 1 in [2]; I think the idea was to
> additionally specify the batch size and batch timeout in the ctor.
> @Hausmann,
> Steffen <shau...@amazon.de> knows more.
>
> 2. I guess your question is if current AsyncIO is not sufficient already if
> exactly-once is not needed? The answer is currently no, because AsyncIO is
> not doing any batching. The user could do batching before that but that's
> quite a bit of code. However, we should really think if AsyncIO should also
> support batching.
> I would also say that the scope of AsyncIO and AsyncSink is quite
> different: the first one is for application developers and the second one
> is for connector developers and would be deemed an implementation detail by
> the application developer. Of course, advanced users may fall in both
> categories, so the distinction does not always hold.
>
> Nevertheless, there is some overlap between both approaches and it's
> important to think if the added complexity warrants the benefit. It would
> be interesting to hear how other devs see that.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> [2]
>
> https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
>
> On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com> wrote:
>
> > Hi, Avrid
> > Thank you Avrid for perfecting Sink through this FLIP. I have two little
> > questions
> >
> > 1. What do you think of us directly providing an interface as follows? In
> > this way, there may be no need to expose the Mailbox to the user. We can
> > implement an `AsyncSinkWriterOperator` to control the length of the
> queue.
> > If it is too long, do not call SinkWriter::write.
> > public interface AsyncSinkWriter<InputT, CommT, WriterStateT>
> >         extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT,
> > WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at
> > first.
> >     int getQueueLimit();
> > }
> >
> > 2. Another question is: If users don't care about exactly once and the
> > unification of stream and batch, how about letting users use
> > `AsyncFunction` directly? I don’t have an answer either. I want to hear
> > your suggestions.
> >
> > Best,
> > Guowei
> >
> >
> > On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Dear devs,
> > >
> > > today I'd like to start the discussion on the Sink API. I have drafted
> a
> > > FLIP [1] with an accompanying PR [2].
> > >
> > > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs
> in
> > > one. In this discussion, we should decide on the scope and cut out too
> > > invasive steps if we can't reach an agreement.
> > >
> > > Step 1 is to add a few more pieces of information to context objects.
> > > That's non-breaking and needed for the async communication pattern in
> > > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor),
> I
> > > think that this should entail the least discussions.
> > >
> > > Step 2 is to also offer the same context information to committers.
> Here
> > we
> > > can offer some compatibility methods to not break existing sinks. The
> > main
> > > use case would be some async exactly-once sink but I'm not sure if we
> > would
> > > use async communication patterns at all here (or simply wait for all
> > async
> > > requests to finish in a sync way). It may also help with async cleanup
> > > tasks though.
> > >
> > > While drafting Step 2, I noticed the big entanglement of the current
> API.
> > > To figure out if there is a committer during the stream graph creation,
> > we
> > > actually need to create a committer which can have unforeseen
> > consequences.
> > > Thus, I spiked if we can disentangle the interface and have separate
> > > interfaces for the different use cases. The resulting step 3 would be a
> > > completely breaking change and thus is probably controversial. However,
> > I'd
> > > also see the disentanglement as a way to prepare to make Sinks more
> > > expressive (write and commit coordinator) without completely
> overloading
> > > the main interface.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API
> > > [2] https://github.com/apache/flink/pull/16399
> > > [3]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > >
> >
>

Reply via email to