Adding the InterruptException to the write method would make it explicit
that the write call can block but must react to interruptions (e.g. when
Flink wants to cancel the operation). I think this makes the contract a bit
clearer.

I think starting simple and then extending the API as we see the need is a
good idea.

Cheers,
Till

On Tue, Jun 22, 2021 at 11:20 AM Hausmann, Steffen <shau...@amazon.de>
wrote:

> Hey,
>
> Agreed on starting with a blocking `write`. I've adapted the FLIP
> accordingly.
>
> For now I've chosen to add the `InterruptedException` to the `write`
> method signature as I'm not fully understanding the implications of
> swallowing the exception. Depending on the details of  the code that is
> calling the write method, it may cause event loss. But this seems more of
> an implementation detail, that we can revisit once we are actually
> implementing the sink.
>
> Unless there are additional comments, does it make sense to start the
> voting process in the next day or two?
>
> Cheers, Steffen
>
>
> On 21.06.21, 14:51, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     Hi,
>
>     Thanks Steffen for the explanations. I think it makes sense to me.
>
>     Re Arvid/Steffen:
>
>     - Keep in mind that even if we choose to provide a non blocking API
> using
>     the `isAvailable()`/`getAvailableFuture()` method, we would still need
> to
>     support blocking inside the sinks. For example at the very least,
> emitting
>     many records at once (`flatMap`) or firing timers are scenarios when
> output
>     availability would be ignored at the moment by the runtime. Also I
> would
>     imagine writing very large (like 1GB) records would be blocking on
>     something as well.
>     - Secondly, exposing availability to the API level might not be that
>     easy/trivial. The availability pattern as defined in
> `AvailabilityProvider`
>     class is quite complicated and not that easy to implement by a user.
>
>     Both of those combined with lack of a clear motivation for adding
>     `AvailabilityProvider` to the sinks/operators/functions,  I would vote
> on
>     just starting with blocking `write` calls. This can always be extended
> in
>     the future with availability if needed/motivated properly.
>
>     That would be aligned with either Arvid's option 1 or 2. I don't know
> what
>     are the best practices with `InterruptedException`, but I'm always
> afraid
>     of it, so I would feel personally safer with option 2.
>
>     I'm not sure what problem option 3 is helping to solve? Adding
> `wakeUp()`
>     would sound strange to me.
>
>     Best,
>     Piotrek
>
>     pon., 21 cze 2021 o 12:15 Arvid Heise <ar...@apache.org> napisał(a):
>
>     > Hi Piotr,
>     >
>     > to pick up this discussion thread again:
>     > - This FLIP is about providing some base implementation for FLIP-143
> sinks
>     > that make adding new implementations easier, similar to the
>     > SourceReaderBase.
>     > - The whole availability topic will most likely be a separate FLIP.
> The
>     > basic issue just popped up here because we currently have no way to
> signal
>     > backpressure in sinks except by blocking `write`. This feels quite
> natural
>     > in sinks with sync communication but quite unnatural in async sinks.
>     >
>     > Now we have a couple of options. In all cases, we would have some WIP
>     > limit on the number of records/requests being able to be processed in
>     > parallel asynchronously (similar to asyncIO).
>     > 1. We use some blocking queue in `write`, then we need to handle
>     > interruptions. In the easiest case, we extend `write` to throw the
>     > `InterruptedException`, which is a small API change.
>     > 2. We use a blocking queue, but handle interrupts and
> swallow/translate
>     > them. No API change.
>     > Both solutions block the task thread, so any RPC message / unaligned
>     > checkpoint would be processed only after the backpressure is
> temporarily
>     > lifted. That's similar to the discussions that you linked.
> Cancellation may
>     > also be a tad harder on 2.
>     > 3. We could also add some `wakeUp` to the `SinkWriter` similar to
>     > `SplitFetcher` [1]. Basically, you use a normal queue with a
> completeable
>     > future on which you block. Wakeup would be a clean way to complete
> it next
>     > to the natural completion through finished requests.
>     > 4. We add availability to the sink. However, this API change also
> requires
>     > that we allow operators to be available so it may be a bigger change
> with
>     > undesired side-effects. On the other hand, we could also use the same
>     > mechanism for asyncIO.
>     >
>     > For users of FLIP-171, none of the options are exposed. So we could
> also
>     > start with a simple solution (add `InterruptedException`) and later
> try to
>     > add availability. Option 1+2 would also not require an additional
> FLIP; we
>     > could add it as part of this FLIP.
>     >
>     > Best,
>     >
>     > Arvid
>     >
>     > [1]
>     >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258
>     > On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen
>     > <shau...@amazon.de.invalid> wrote:
>     >
>     >> Hey Piotrek,
>     >>
>     >> Thanks for your comments on the FLIP. I'll address your second
> question
>     >> first, as I think it's more central to this FLIP. Just looking at
> the AWS
>     >> ecosystem, there are several sinks with overlapping functionality.
> I've
>     >> chosen AWS sinks here because I'm most familiar with those, but a
> similar
>     >> argument applies more generically for destination that support
> async ingest.
>     >>
>     >> There is, for instance, a sink for Amazon Kinesis Data Streams that
> is
>     >> part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose
> [2], a
>     >> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4].
> All
>     >> these sinks have implemented their own mechanisms for batching,
> persisting,
>     >> and retrying events. And I'm not sure if all of them properly
> participate
>     >> in checkpointing. [3] even seems to closely mirror [1] as it
> contains
>     >> references to the Kinesis Producer Library, which is unrelated to
> Amazon
>     >> DynamoDB.
>     >>
>     >> These sinks predate FLIP-143. But as batching, persisting, and
> retrying
>     >> capabilities do not seem to be part of FLIP-143, I'd argue that we
> would
>     >> end up with similar duplication, even if these sinks were rewritten
> today
>     >> based on FLIP-143. And that's the idea of FLIP-171: abstract away
> these
>     >> commonly required capabilities so that it becomes easy to create
> support
>     >> for a wide range of destination without having to think about
> batching,
>     >> retries, checkpointing, etc. I've included an example in the FLIP
> [5] that
>     >> shows that it only takes a couple of lines of code to implement a
> sink with
>     >> exactly-once semantics. To be fair, the example is lacking robust
> failure
>     >> handling and some more advanced capabilities of [1], but I think it
> still
>     >> supports this point.
>     >>
>     >> Regarding your point on the isAvailable pattern. We need some way
> for the
>     >> sink to propagate backpressure and we would also like to support
> time based
>     >> buffering hints. There are two options I currently see and would
> need
>     >> additional input on which one is the better or more desirable one.
> The
>     >> first option is to use the non-blocking isAvailable pattern.
> Internally,
>     >> the sink persists buffered events in the snapshot state which
> avoids having
>     >> to flush buffered record on a checkpoint. This seems to align well
> with the
>     >> non-blocking isAvailable pattern. The second option is to make
> calls to
>     >> `write` blocking and leverage an internal thread to trigger flushes
> based
>     >> on time based buffering hints. We've discussed these options with
> Arvid and
>     >> suggested to assumed that the `isAvailable` pattern will become
> available
>     >> for sinks through and additional FLIP.
>     >>
>     >> I think it is an important discussion to have. My understanding of
> the
>     >> implications for Flink in general are very naïve, so I'd be happy
> to get
>     >> further guidance. However, I don't want to make this discussion
> part of
>     >> FLIP-171. For FLIP-171 we'll use whatever is available.
>     >>
>     >> Does that make sense?
>     >>
>     >> Cheers, Steffen
>     >>
>     >>
>     >> [1]
>     >>
> https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
>     >> [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
>     >> [3] https://github.com/klarna-incubator/flink-connector-dynamodb
>     >> [4] https://github.com/awslabs/amazon-timestream-tools/
>     >> [5]
>     >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams
>     >>
>     >>
>     >> On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
>     >>
>     >>     CAUTION: This email originated from outside of the
> organization. Do
>     >> not click links or open attachments unless you can confirm the
> sender and
>     >> know the content is safe.
>     >>
>     >>
>     >>
>     >>     Hi Steffen,
>     >>
>     >>     Thanks for writing down the proposal. Back when the new Sink
> API was
>     >> being
>     >>     discussed, I was proposing to add our usual
> `CompletableFuture<Void>
>     >>     isAvailable()` pattern to make sinks non-blocking. You can see
> the
>     >>     discussion starting here [1], and continuing for a couple of
> more
>     >> posts
>     >>     until here [2]. Back then, the outcome was that it would give
> very
>     >> little
>     >>     benefit, at the expense of making the API more complicated.
> Could you
>     >> maybe
>     >>     relate your proposal to that discussion from last year?
>     >>
>     >>     I see that your proposal is going much further than just adding
> the
>     >>     availability method, could you also motivate this a bit further?
>     >> Could you
>     >>     maybe reference/show some sinks that:
>     >>     1. are already implemented using FLIP-143
>     >>     2. that have some code duplication...
>     >>     3. ...this duplication would be solved by FLIP-171
>     >>
>     >>     Best,
>     >>     Piotrek
>     >>
>     >>     [1]
>     >>
>     >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
>     >>     [2]
>     >>
>     >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html
>     >>
>     >>     śr., 9 cze 2021 o 09:49 Hausmann, Steffen
> <shau...@amazon.de.invalid>
>     >>     napisał(a):
>     >>
>     >>     > Hi there,
>     >>     >
>     >>     > We would like to start a discussion thread on "FLIP-171: Async
>     >> Sink" [1],
>     >>     > where we propose to create a common abstraction for
> destinations
>     >> that
>     >>     > support async requests. This abstraction will make it easier
> to add
>     >>     > destinations to Flink by implementing a lightweight shim,
> while it
>     >> avoids
>     >>     > maintaining dozens of independent sinks.
>     >>     >
>     >>     > Looking forward to your feedback.
>     >>     >
>     >>     > Cheers, Steffen
>     >>     >
>     >>     > [1]
>     >>     >
>     >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>     >>     >
>     >>     >
>     >>     >
>     >>     > Amazon Web Services EMEA SARL
>     >>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
>     >>     > Sitz der Gesellschaft: L-1855 Luxemburg
>     >>     > eingetragen im Luxemburgischen Handelsregister unter R.C.S.
> B186284
>     >>     >
>     >>     > Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     >>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
>     >>     > Sitz der Zweigniederlassung: Muenchen
>     >>     > eingetragen im Handelsregister des Amtsgerichts Muenchen
> unter HRB
>     >> 242240,
>     >>     > USt-ID DE317013094
>     >>     >
>     >>     >
>     >>     >
>     >>     >
>     >>
>     >>
>     >>
>     >>
>     >> Amazon Web Services EMEA SARL
>     >> 38 avenue John F. Kennedy, L-1855 Luxembourg
>     >> Sitz der Gesellschaft: L-1855 Luxemburg
>     >> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>     >>
>     >> Amazon Web Services EMEA SARL, Niederlassung Deutschland
>     >> Marcel-Breuer-Str. 12, D-80807 Muenchen
>     >> Sitz der Zweigniederlassung: Muenchen
>     >> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
>     >> 242240, USt-ID DE317013094
>     >>
>     >>
>     >>
>     >>
>
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>

Reply via email to