Hi, Arvid 1. You are right that the core difference between the two options is whether to expose `MailboxExecutor`. My personal preference is that the less internal implementations are exposed to users, the better.
2. `AsyncIO` does not support Batch output, but we can implement one based on `AsyncIO`, such as `AsyncIOBatchSinkBase`. Then we might provide a `SinkUtil.sinkTo(inputStream, AsyncIOBatchSinkBase)`, which could help users to build a DataStream topology. I personally think that if a function could be done externally (for example, encapsulating an existing UDF to complete a function), it is better than internal implementation. As for whether it is necessary to distinguish between a Sink and a non-Sink, I personally don't think it is particularly important. ```for example InputStream = blalbalba; SinkUtil.sinkTo(inputStrean, AsyncIOBasedBatchKensisSink); ``` 3. For 2, the more general question is: Is `Sink` an operator or a topology? If we think that `Sink` is a topology, then whether `AsyncIO` can be used as `Sink` may not be a problem. We might think that how we could provide an interface to help the user to build the sink topology. Best, Guowei On Wed, Jul 21, 2021 at 5:08 AM Arvid Heise <ar...@apache.org> wrote: > Hi Guowei, > > 1. your idea is quite similar to FLIP-171 [1]. The question is if we > implement FLIP-171 based on public interfaces (which would require exposing > MailboxExecutor as described here in FLIP-177) or if it's better to > implement it internally and hide it. > The first option is an abstract base class; your second option would be an > abstract interface that has matching implementation internally (similarly > to AsyncIO). > There is an example for option 1 in [2]; I think the idea was to > additionally specify the batch size and batch timeout in the ctor. > @Hausmann, > Steffen <shau...@amazon.de> knows more. > > 2. I guess your question is if current AsyncIO is not sufficient already if > exactly-once is not needed? The answer is currently no, because AsyncIO is > not doing any batching. The user could do batching before that but that's > quite a bit of code. However, we should really think if AsyncIO should also > support batching. > I would also say that the scope of AsyncIO and AsyncSink is quite > different: the first one is for application developers and the second one > is for connector developers and would be deemed an implementation detail by > the application developer. Of course, advanced users may fall in both > categories, so the distinction does not always hold. > > Nevertheless, there is some overlap between both approaches and it's > important to think if the added complexity warrants the benefit. It would > be interesting to hear how other devs see that. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > [2] > > https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java > > On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma <guowei....@gmail.com> wrote: > > > Hi, Avrid > > Thank you Avrid for perfecting Sink through this FLIP. I have two little > > questions > > > > 1. What do you think of us directly providing an interface as follows? In > > this way, there may be no need to expose the Mailbox to the user. We can > > implement an `AsyncSinkWriterOperator` to control the length of the > queue. > > If it is too long, do not call SinkWriter::write. > > public interface AsyncSinkWriter<InputT, CommT, WriterStateT> > > extends SinkWriter<Tuple2<InputT, XXXFuture<?>>, CommT, > > WriterStateT> { //// please ignore the name of Tuple2 and XXXFuture at > > first. > > int getQueueLimit(); > > } > > > > 2. Another question is: If users don't care about exactly once and the > > unification of stream and batch, how about letting users use > > `AsyncFunction` directly? I don’t have an answer either. I want to hear > > your suggestions. > > > > Best, > > Guowei > > > > > > On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote: > > > > > Dear devs, > > > > > > today I'd like to start the discussion on the Sink API. I have drafted > a > > > FLIP [1] with an accompanying PR [2]. > > > > > > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs > in > > > one. In this discussion, we should decide on the scope and cut out too > > > invasive steps if we can't reach an agreement. > > > > > > Step 1 is to add a few more pieces of information to context objects. > > > That's non-breaking and needed for the async communication pattern in > > > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), > I > > > think that this should entail the least discussions. > > > > > > Step 2 is to also offer the same context information to committers. > Here > > we > > > can offer some compatibility methods to not break existing sinks. The > > main > > > use case would be some async exactly-once sink but I'm not sure if we > > would > > > use async communication patterns at all here (or simply wait for all > > async > > > requests to finish in a sync way). It may also help with async cleanup > > > tasks though. > > > > > > While drafting Step 2, I noticed the big entanglement of the current > API. > > > To figure out if there is a committer during the stream graph creation, > > we > > > actually need to create a committer which can have unforeseen > > consequences. > > > Thus, I spiked if we can disentangle the interface and have separate > > > interfaces for the different use cases. The resulting step 3 would be a > > > completely breaking change and thus is probably controversial. However, > > I'd > > > also see the disentanglement as a way to prepare to make Sinks more > > > expressive (write and commit coordinator) without completely > overloading > > > the main interface. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > > > [2] https://github.com/apache/flink/pull/16399 > > > [3] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > > > >