Hey Piotrek,

Thanks for your comments on the FLIP. I'll address your second question first, 
as I think it's more central to this FLIP. Just looking at the AWS ecosystem, 
there are several sinks with overlapping functionality. I've chosen AWS sinks 
here because I'm most familiar with those, but a similar argument applies more 
generically for destination that support async ingest.

There is, for instance, a sink for Amazon Kinesis Data Streams that is part of 
Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a sink for 
Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All these sinks have 
implemented their own mechanisms for batching, persisting, and retrying events. 
And I'm not sure if all of them properly participate in checkpointing. [3] even 
seems to closely mirror [1] as it contains references to the Kinesis Producer 
Library, which is unrelated to Amazon DynamoDB.

These sinks predate FLIP-143. But as batching, persisting, and retrying 
capabilities do not seem to be part of FLIP-143, I'd argue that we would end up 
with similar duplication, even if these sinks were rewritten today based on 
FLIP-143. And that's the idea of FLIP-171: abstract away these commonly 
required capabilities so that it becomes easy to create support for a wide 
range of destination without having to think about batching, retries, 
checkpointing, etc. I've included an example in the FLIP [5] that shows that it 
only takes a couple of lines of code to implement a sink with exactly-once 
semantics. To be fair, the example is lacking robust failure handling and some 
more advanced capabilities of [1], but I think it still supports this point.

Regarding your point on the isAvailable pattern. We need some way for the sink 
to propagate backpressure and we would also like to support time based 
buffering hints. There are two options I currently see and would need 
additional input on which one is the better or more desirable one. The first 
option is to use the non-blocking isAvailable pattern. Internally, the sink 
persists buffered events in the snapshot state which avoids having to flush 
buffered record on a checkpoint. This seems to align well with the non-blocking 
isAvailable pattern. The second option is to make calls to `write` blocking and 
leverage an internal thread to trigger flushes based on time based buffering 
hints. We've discussed these options with Arvid and suggested to assumed that 
the `isAvailable` pattern will become available for sinks through and 
additional FLIP.

I think it is an important discussion to have. My understanding of the 
implications for Flink in general are very naïve, so I'd be happy to get 
further guidance. However, I don't want to make this discussion part of 
FLIP-171. For FLIP-171 we'll use whatever is available.

Does that make sense?

Cheers, Steffen


[1] 
https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
 
[2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
[3] https://github.com/klarna-incubator/flink-connector-dynamodb
[4] https://github.com/awslabs/amazon-timestream-tools/
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams


On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    Hi Steffen,

    Thanks for writing down the proposal. Back when the new Sink API was being
    discussed, I was proposing to add our usual `CompletableFuture<Void>
    isAvailable()` pattern to make sinks non-blocking. You can see the
    discussion starting here [1], and continuing for a couple of more posts
    until here [2]. Back then, the outcome was that it would give very little
    benefit, at the expense of making the API more complicated. Could you maybe
    relate your proposal to that discussion from last year?

    I see that your proposal is going much further than just adding the
    availability method, could you also motivate this a bit further? Could you
    maybe reference/show some sinks that:
    1. are already implemented using FLIP-143
    2. that have some code duplication...
    3. ...this duplication would be solved by FLIP-171

    Best,
    Piotrek

    [1]
    
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
    [2]
    
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html

    śr., 9 cze 2021 o 09:49 Hausmann, Steffen <shau...@amazon.de.invalid>
    napisał(a):

    > Hi there,
    >
    > We would like to start a discussion thread on "FLIP-171: Async Sink" [1],
    > where we propose to create a common abstraction for destinations that
    > support async requests. This abstraction will make it easier to add
    > destinations to Flink by implementing a lightweight shim, while it avoids
    > maintaining dozens of independent sinks.
    >
    > Looking forward to your feedback.
    >
    > Cheers, Steffen
    >
    > [1]
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    >
    >
    >
    > Amazon Web Services EMEA SARL
    > 38 avenue John F. Kennedy, L-1855 Luxembourg
    > Sitz der Gesellschaft: L-1855 Luxemburg
    > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
    >
    > Amazon Web Services EMEA SARL, Niederlassung Deutschland
    > Marcel-Breuer-Str. 12, D-80807 Muenchen
    > Sitz der Zweigniederlassung: Muenchen
    > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
    > USt-ID DE317013094
    >
    >
    >
    >




Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, 
USt-ID DE317013094



Reply via email to