Hi Danny,

to add I'd propose to use the flink-connector-base package which has the
rough equivalent on source-side SourceReaderBase [1]. Since it's such a
handy base implementation, I'd like to see it directly in the main flink
repository.

For the actual connectors, I'm currently working on a proposal for a common
connector repository under Flink umbrella.

[1]
https://github.com/AHeise/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L58-L58

On Wed, Jun 16, 2021 at 6:06 PM Hausmann, Steffen <shau...@amazon.de.invalid>
wrote:

> Hi Danny,
>
> Right now, I'd expect the core of the Async Sink (without third party
> dependencies) to live in its own submodule. For instance
> `flink-connector-async` as part of `flink-connectors`.
>
> I'm currently planning to implement three different sinks to verify that
> the design of the sink if flexible enough to support different services:
> Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon
> DynamoDB. But I'm not sure where to actually put them. To keep is simple,
> I'd start with a module that contains all AWS specific connectors. However,
> it has the obvious disadvantage that if someone wants to use a single sink,
> they would need to pull in all dependencies for all supported services that
> are included in this module (mainly the AWS SDK for these services). But I
> don't know how much of a problem that's going to be in practice. If the
> respective jar grows too big because all the included dependencies, that's
> certainly not going to work. But for now I'd just give it a try and then
> start a discussion once I have more data to share.
>
> What's more interesting is whether that module should be part of the Flink
> code base or live somewhere else. I'd be great to get some feedback from
> the community on this.
>
> Regarding the Kinesis Data Streams sink, I fully agree that it would be
> nice to remove the dependency to the KPL. So it seems to be desirable to
> keep the existing and the new FLIP-171  based implementation in separate
> modules. Otherwise people would be forced to pull in the KPL dependencies,
> even if they are only using the new implementation. In addition, the new
> implementation will not support the exact same functionality as the
> existing one: the KPL implements a very optimized form of aggregation on a
> shard level [1] by maintaining a mapping of shards and their respective key
> spaces. The new implementation can in principle support aggregation as
> well, but only on a partition key level, which may lead to less efficient
> aggregation and higher latencies.
>
> Cheers, Steffen
>
> [1]
> https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation
>
>
>
> On 15.06.21, 19:52, "Cranmer, Danny" <cranm...@amazon.co.uk.INVALID>
> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Hey Steffen,
>
>     I have a few questions regarding the FLIP:
>     1. Where do you expect the core code to live, would it be in an
> existing module (say flink-clients) or would you introduce a new module?
>     2. Which destination implementations do you intend to ship with this
> FLIP? I see an example with Kinesis but you also list a bunch of other
> candidates.
>     3. For the Kinesis implementation, would you add the Sink to the
> existing flink-connector-kinesis repo, or create a new module? Reason I ask
> is that the existing Kinesis Sink depends on KPL and has a heavy transitive
> dependency chain, removing this would substantially reduce application size
> and clean the dependency chain
>
>     Thanks,
>
>     On 10/06/2021, 09:09, "Hausmann, Steffen" <shau...@amazon.de.INVALID>
> wrote:
>
>         CAUTION: This email originated from outside of the organization.
> Do not click links or open attachments unless you can confirm the sender
> and know the content is safe.
>
>
>
>         Hey Piotrek,
>
>         Thanks for your comments on the FLIP. I'll address your second
> question first, as I think it's more central to this FLIP. Just looking at
> the AWS ecosystem, there are several sinks with overlapping functionality.
> I've chosen AWS sinks here because I'm most familiar with those, but a
> similar argument applies more generically for destination that support
> async ingest.
>
>         There is, for instance, a sink for Amazon Kinesis Data Streams
> that is part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose
> [2], a sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4].
> All these sinks have implemented their own mechanisms for batching,
> persisting, and retrying events. And I'm not sure if all of them properly
> participate in checkpointing. [3] even seems to closely mirror [1] as it
> contains references to the Kinesis Producer Library, which is unrelated to
> Amazon DynamoDB.
>
>         These sinks predate FLIP-143. But as batching, persisting, and
> retrying capabilities do not seem to be part of FLIP-143, I'd argue that we
> would end up with similar duplication, even if these sinks were rewritten
> today based on FLIP-143. And that's the idea of FLIP-171: abstract away
> these commonly required capabilities so that it becomes easy to create
> support for a wide range of destination without having to think about
> batching, retries, checkpointing, etc. I've included an example in the FLIP
> [5] that shows that it only takes a couple of lines of code to implement a
> sink with exactly-once semantics. To be fair, the example is lacking robust
> failure handling and some more advanced capabilities of [1], but I think it
> still supports this point.
>
>         Regarding your point on the isAvailable pattern. We need some way
> for the sink to propagate backpressure and we would also like to support
> time based buffering hints. There are two options I currently see and would
> need additional input on which one is the better or more desirable one. The
> first option is to use the non-blocking isAvailable pattern. Internally,
> the sink persists buffered events in the snapshot state which avoids having
> to flush buffered record on a checkpoint. This seems to align well with the
> non-blocking isAvailable pattern. The second option is to make calls to
> `write` blocking and leverage an internal thread to trigger flushes based
> on time based buffering hints. We've discussed these options with Arvid and
> suggested to assumed that the `isAvailable` pattern will become available
> for sinks through and additional FLIP.
>
>         I think it is an important discussion to have. My understanding of
> the implications for Flink in general are very naïve, so I'd be happy to
> get further guidance. However, I don't want to make this discussion part of
> FLIP-171. For FLIP-171 we'll use whatever is available.
>
>         Does that make sense?
>
>         Cheers, Steffen
>
>
>         [1]
> https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
>         [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
>         [3] https://github.com/klarna-incubator/flink-connector-dynamodb
>         [4] https://github.com/awslabs/amazon-timestream-tools/
>         [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams
>
>
>         On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
>
>             CAUTION: This email originated from outside of the
> organization. Do not click links or open attachments unless you can confirm
> the sender and know the content is safe.
>
>
>
>             Hi Steffen,
>
>             Thanks for writing down the proposal. Back when the new Sink
> API was being
>             discussed, I was proposing to add our usual
> `CompletableFuture<Void>
>             isAvailable()` pattern to make sinks non-blocking. You can see
> the
>             discussion starting here [1], and continuing for a couple of
> more posts
>             until here [2]. Back then, the outcome was that it would give
> very little
>             benefit, at the expense of making the API more complicated.
> Could you maybe
>             relate your proposal to that discussion from last year?
>
>             I see that your proposal is going much further than just
> adding the
>             availability method, could you also motivate this a bit
> further? Could you
>             maybe reference/show some sinks that:
>             1. are already implemented using FLIP-143
>             2. that have some code duplication...
>             3. ...this duplication would be solved by FLIP-171
>
>             Best,
>             Piotrek
>
>             [1]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
>             [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html
>
>             śr., 9 cze 2021 o 09:49 Hausmann, Steffen
> <shau...@amazon.de.invalid>
>             napisał(a):
>
>             > Hi there,
>             >
>             > We would like to start a discussion thread on "FLIP-171:
> Async Sink" [1],
>             > where we propose to create a common abstraction for
> destinations that
>             > support async requests. This abstraction will make it easier
> to add
>             > destinations to Flink by implementing a lightweight shim,
> while it avoids
>             > maintaining dozens of independent sinks.
>             >
>             > Looking forward to your feedback.
>             >
>             > Cheers, Steffen
>             >
>             > [1]
>             >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>             >
>             >
>             >
>             > Amazon Web Services EMEA SARL
>             > 38 avenue John F. Kennedy, L-1855 Luxembourg
>             > Sitz der Gesellschaft: L-1855 Luxemburg
>             > eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
>             >
>             > Amazon Web Services EMEA SARL, Niederlassung Deutschland
>             > Marcel-Breuer-Str. 12, D-80807 Muenchen
>             > Sitz der Zweigniederlassung: Muenchen
>             > eingetragen im Handelsregister des Amtsgerichts Muenchen
> unter HRB 242240,
>             > USt-ID DE317013094
>             >
>             >
>             >
>             >
>
>
>
>
>         Amazon Web Services EMEA SARL
>         38 avenue John F. Kennedy, L-1855 Luxembourg
>         Sitz der Gesellschaft: L-1855 Luxemburg
>         eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
>         Amazon Web Services EMEA SARL, Niederlassung Deutschland
>         Marcel-Breuer-Str. 12, D-80807 Muenchen
>         Sitz der Zweigniederlassung: Muenchen
>         eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
> 242240, USt-ID DE317013094
>
>
>
>
>
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>

Reply via email to