Hi Danny, to add I'd propose to use the flink-connector-base package which has the rough equivalent on source-side SourceReaderBase [1]. Since it's such a handy base implementation, I'd like to see it directly in the main flink repository.
For the actual connectors, I'm currently working on a proposal for a common connector repository under Flink umbrella. [1] https://github.com/AHeise/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L58-L58 On Wed, Jun 16, 2021 at 6:06 PM Hausmann, Steffen <shau...@amazon.de.invalid> wrote: > Hi Danny, > > Right now, I'd expect the core of the Async Sink (without third party > dependencies) to live in its own submodule. For instance > `flink-connector-async` as part of `flink-connectors`. > > I'm currently planning to implement three different sinks to verify that > the design of the sink if flexible enough to support different services: > Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon > DynamoDB. But I'm not sure where to actually put them. To keep is simple, > I'd start with a module that contains all AWS specific connectors. However, > it has the obvious disadvantage that if someone wants to use a single sink, > they would need to pull in all dependencies for all supported services that > are included in this module (mainly the AWS SDK for these services). But I > don't know how much of a problem that's going to be in practice. If the > respective jar grows too big because all the included dependencies, that's > certainly not going to work. But for now I'd just give it a try and then > start a discussion once I have more data to share. > > What's more interesting is whether that module should be part of the Flink > code base or live somewhere else. I'd be great to get some feedback from > the community on this. > > Regarding the Kinesis Data Streams sink, I fully agree that it would be > nice to remove the dependency to the KPL. So it seems to be desirable to > keep the existing and the new FLIP-171 based implementation in separate > modules. Otherwise people would be forced to pull in the KPL dependencies, > even if they are only using the new implementation. In addition, the new > implementation will not support the exact same functionality as the > existing one: the KPL implements a very optimized form of aggregation on a > shard level [1] by maintaining a mapping of shards and their respective key > spaces. The new implementation can in principle support aggregation as > well, but only on a partition key level, which may lead to less efficient > aggregation and higher latencies. > > Cheers, Steffen > > [1] > https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation > > > > On 15.06.21, 19:52, "Cranmer, Danny" <cranm...@amazon.co.uk.INVALID> > wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hey Steffen, > > I have a few questions regarding the FLIP: > 1. Where do you expect the core code to live, would it be in an > existing module (say flink-clients) or would you introduce a new module? > 2. Which destination implementations do you intend to ship with this > FLIP? I see an example with Kinesis but you also list a bunch of other > candidates. > 3. For the Kinesis implementation, would you add the Sink to the > existing flink-connector-kinesis repo, or create a new module? Reason I ask > is that the existing Kinesis Sink depends on KPL and has a heavy transitive > dependency chain, removing this would substantially reduce application size > and clean the dependency chain > > Thanks, > > On 10/06/2021, 09:09, "Hausmann, Steffen" <shau...@amazon.de.INVALID> > wrote: > > CAUTION: This email originated from outside of the organization. > Do not click links or open attachments unless you can confirm the sender > and know the content is safe. > > > > Hey Piotrek, > > Thanks for your comments on the FLIP. I'll address your second > question first, as I think it's more central to this FLIP. Just looking at > the AWS ecosystem, there are several sinks with overlapping functionality. > I've chosen AWS sinks here because I'm most familiar with those, but a > similar argument applies more generically for destination that support > async ingest. > > There is, for instance, a sink for Amazon Kinesis Data Streams > that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose > [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. > All these sinks have implemented their own mechanisms for batching, > persisting, and retrying events. And I'm not sure if all of them properly > participate in checkpointing. [3] even seems to closely mirror [1] as it > contains references to the Kinesis Producer Library, which is unrelated to > Amazon DynamoDB. > > These sinks predate FLIP-143. But as batching, persisting, and > retrying capabilities do not seem to be part of FLIP-143, I'd argue that we > would end up with similar duplication, even if these sinks were rewritten > today based on FLIP-143. And that's the idea of FLIP-171: abstract away > these commonly required capabilities so that it becomes easy to create > support for a wide range of destination without having to think about > batching, retries, checkpointing, etc. I've included an example in the FLIP > [5] that shows that it only takes a couple of lines of code to implement a > sink with exactly-once semantics. To be fair, the example is lacking robust > failure handling and some more advanced capabilities of [1], but I think it > still supports this point. > > Regarding your point on the isAvailable pattern. We need some way > for the sink to propagate backpressure and we would also like to support > time based buffering hints. There are two options I currently see and would > need additional input on which one is the better or more desirable one. The > first option is to use the non-blocking isAvailable pattern. Internally, > the sink persists buffered events in the snapshot state which avoids having > to flush buffered record on a checkpoint. This seems to align well with the > non-blocking isAvailable pattern. The second option is to make calls to > `write` blocking and leverage an internal thread to trigger flushes based > on time based buffering hints. We've discussed these options with Arvid and > suggested to assumed that the `isAvailable` pattern will become available > for sinks through and additional FLIP. > > I think it is an important discussion to have. My understanding of > the implications for Flink in general are very naïve, so I'd be happy to > get further guidance. However, I don't want to make this discussion part of > FLIP-171. For FLIP-171 we'll use whatever is available. > > Does that make sense? > > Cheers, Steffen > > > [1] > https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis > [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors > [3] https://github.com/klarna-incubator/flink-connector-dynamodb > [4] https://github.com/awslabs/amazon-timestream-tools/ > [5] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams > > > On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote: > > CAUTION: This email originated from outside of the > organization. Do not click links or open attachments unless you can confirm > the sender and know the content is safe. > > > > Hi Steffen, > > Thanks for writing down the proposal. Back when the new Sink > API was being > discussed, I was proposing to add our usual > `CompletableFuture<Void> > isAvailable()` pattern to make sinks non-blocking. You can see > the > discussion starting here [1], and continuing for a couple of > more posts > until here [2]. Back then, the outcome was that it would give > very little > benefit, at the expense of making the API more complicated. > Could you maybe > relate your proposal to that discussion from last year? > > I see that your proposal is going much further than just > adding the > availability method, could you also motivate this a bit > further? Could you > maybe reference/show some sinks that: > 1. are already implemented using FLIP-143 > 2. that have some code duplication... > 3. ...this duplication would be solved by FLIP-171 > > Best, > Piotrek > > [1] > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html > [2] > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html > > śr., 9 cze 2021 o 09:49 Hausmann, Steffen > <shau...@amazon.de.invalid> > napisał(a): > > > Hi there, > > > > We would like to start a discussion thread on "FLIP-171: > Async Sink" [1], > > where we propose to create a common abstraction for > destinations that > > support async requests. This abstraction will make it easier > to add > > destinations to Flink by implementing a lightweight shim, > while it avoids > > maintaining dozens of independent sinks. > > > > Looking forward to your feedback. > > > > Cheers, Steffen > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > > > > > > > Amazon Web Services EMEA SARL > > 38 avenue John F. Kennedy, L-1855 Luxembourg > > Sitz der Gesellschaft: L-1855 Luxemburg > > eingetragen im Luxemburgischen Handelsregister unter R.C.S. > B186284 > > > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > > Marcel-Breuer-Str. 12, D-80807 Muenchen > > Sitz der Zweigniederlassung: Muenchen > > eingetragen im Handelsregister des Amtsgerichts Muenchen > unter HRB 242240, > > USt-ID DE317013094 > > > > > > > > > > > > > Amazon Web Services EMEA SARL > 38 avenue John F. Kennedy, L-1855 Luxembourg > Sitz der Gesellschaft: L-1855 Luxemburg > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > Marcel-Breuer-Str. 12, D-80807 Muenchen > Sitz der Zweigniederlassung: Muenchen > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB > 242240, USt-ID DE317013094 > > > > > > > > > Amazon Web Services EMEA SARL > 38 avenue John F. Kennedy, L-1855 Luxembourg > Sitz der Gesellschaft: L-1855 Luxemburg > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284 > > Amazon Web Services EMEA SARL, Niederlassung Deutschland > Marcel-Breuer-Str. 12, D-80807 Muenchen > Sitz der Zweigniederlassung: Muenchen > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, > USt-ID DE317013094 > > > >