Hi,

Thanks Steffen for the explanations. I think it makes sense to me.

Re Arvid/Steffen:

- Keep in mind that even if we choose to provide a non blocking API using
the `isAvailable()`/`getAvailableFuture()` method, we would still need to
support blocking inside the sinks. For example at the very least, emitting
many records at once (`flatMap`) or firing timers are scenarios when output
availability would be ignored at the moment by the runtime. Also I would
imagine writing very large (like 1GB) records would be blocking on
something as well.
- Secondly, exposing availability to the API level might not be that
easy/trivial. The availability pattern as defined in `AvailabilityProvider`
class is quite complicated and not that easy to implement by a user.

Both of those combined with lack of a clear motivation for adding
`AvailabilityProvider` to the sinks/operators/functions,  I would vote on
just starting with blocking `write` calls. This can always be extended in
the future with availability if needed/motivated properly.

That would be aligned with either Arvid's option 1 or 2. I don't know what
are the best practices with `InterruptedException`, but I'm always afraid
of it, so I would feel personally safer with option 2.

I'm not sure what problem option 3 is helping to solve? Adding `wakeUp()`
would sound strange to me.

Best,
Piotrek

pon., 21 cze 2021 o 12:15 Arvid Heise <ar...@apache.org> napisał(a):

> Hi Piotr,
>
> to pick up this discussion thread again:
> - This FLIP is about providing some base implementation for FLIP-143 sinks
> that make adding new implementations easier, similar to the
> SourceReaderBase.
> - The whole availability topic will most likely be a separate FLIP. The
> basic issue just popped up here because we currently have no way to signal
> backpressure in sinks except by blocking `write`. This feels quite natural
> in sinks with sync communication but quite unnatural in async sinks.
>
> Now we have a couple of options. In all cases, we would have some WIP
> limit on the number of records/requests being able to be processed in
> parallel asynchronously (similar to asyncIO).
> 1. We use some blocking queue in `write`, then we need to handle
> interruptions. In the easiest case, we extend `write` to throw the
> `InterruptedException`, which is a small API change.
> 2. We use a blocking queue, but handle interrupts and swallow/translate
> them. No API change.
> Both solutions block the task thread, so any RPC message / unaligned
> checkpoint would be processed only after the backpressure is temporarily
> lifted. That's similar to the discussions that you linked. Cancellation may
> also be a tad harder on 2.
> 3. We could also add some `wakeUp` to the `SinkWriter` similar to
> `SplitFetcher` [1]. Basically, you use a normal queue with a completeable
> future on which you block. Wakeup would be a clean way to complete it next
> to the natural completion through finished requests.
> 4. We add availability to the sink. However, this API change also requires
> that we allow operators to be available so it may be a bigger change with
> undesired side-effects. On the other hand, we could also use the same
> mechanism for asyncIO.
>
> For users of FLIP-171, none of the options are exposed. So we could also
> start with a simple solution (add `InterruptedException`) and later try to
> add availability. Option 1+2 would also not require an additional FLIP; we
> could add it as part of this FLIP.
>
> Best,
>
> Arvid
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L258-L258
> On Thu, Jun 10, 2021 at 10:09 AM Hausmann, Steffen
> <shau...@amazon.de.invalid> wrote:
>
>> Hey Piotrek,
>>
>> Thanks for your comments on the FLIP. I'll address your second question
>> first, as I think it's more central to this FLIP. Just looking at the AWS
>> ecosystem, there are several sinks with overlapping functionality. I've
>> chosen AWS sinks here because I'm most familiar with those, but a similar
>> argument applies more generically for destination that support async ingest.
>>
>> There is, for instance, a sink for Amazon Kinesis Data Streams that is
>> part of Apache Flink [1], a sink for Amazon Kinesis Data Firehose [2], a
>> sink for Amazon DynamoDB [3], and a sink for Amazon Timestream [4]. All
>> these sinks have implemented their own mechanisms for batching, persisting,
>> and retrying events. And I'm not sure if all of them properly participate
>> in checkpointing. [3] even seems to closely mirror [1] as it contains
>> references to the Kinesis Producer Library, which is unrelated to Amazon
>> DynamoDB.
>>
>> These sinks predate FLIP-143. But as batching, persisting, and retrying
>> capabilities do not seem to be part of FLIP-143, I'd argue that we would
>> end up with similar duplication, even if these sinks were rewritten today
>> based on FLIP-143. And that's the idea of FLIP-171: abstract away these
>> commonly required capabilities so that it becomes easy to create support
>> for a wide range of destination without having to think about batching,
>> retries, checkpointing, etc. I've included an example in the FLIP [5] that
>> shows that it only takes a couple of lines of code to implement a sink with
>> exactly-once semantics. To be fair, the example is lacking robust failure
>> handling and some more advanced capabilities of [1], but I think it still
>> supports this point.
>>
>> Regarding your point on the isAvailable pattern. We need some way for the
>> sink to propagate backpressure and we would also like to support time based
>> buffering hints. There are two options I currently see and would need
>> additional input on which one is the better or more desirable one. The
>> first option is to use the non-blocking isAvailable pattern. Internally,
>> the sink persists buffered events in the snapshot state which avoids having
>> to flush buffered record on a checkpoint. This seems to align well with the
>> non-blocking isAvailable pattern. The second option is to make calls to
>> `write` blocking and leverage an internal thread to trigger flushes based
>> on time based buffering hints. We've discussed these options with Arvid and
>> suggested to assumed that the `isAvailable` pattern will become available
>> for sinks through and additional FLIP.
>>
>> I think it is an important discussion to have. My understanding of the
>> implications for Flink in general are very naïve, so I'd be happy to get
>> further guidance. However, I don't want to make this discussion part of
>> FLIP-171. For FLIP-171 we'll use whatever is available.
>>
>> Does that make sense?
>>
>> Cheers, Steffen
>>
>>
>> [1]
>> https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kinesis
>> [2] https://github.com/aws/aws-kinesisanalytics-flink-connectors
>> [3] https://github.com/klarna-incubator/flink-connector-dynamodb
>> [4] https://github.com/awslabs/amazon-timestream-tools/
>> [5]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink#FLIP171:AsyncSink-SimplifiedAsyncSinkWriterforKinesisDataStreams
>>
>>
>> On 09.06.21, 19:44, "Piotr Nowojski" <pnowoj...@apache.org> wrote:
>>
>>     CAUTION: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>>     Hi Steffen,
>>
>>     Thanks for writing down the proposal. Back when the new Sink API was
>> being
>>     discussed, I was proposing to add our usual `CompletableFuture<Void>
>>     isAvailable()` pattern to make sinks non-blocking. You can see the
>>     discussion starting here [1], and continuing for a couple of more
>> posts
>>     until here [2]. Back then, the outcome was that it would give very
>> little
>>     benefit, at the expense of making the API more complicated. Could you
>> maybe
>>     relate your proposal to that discussion from last year?
>>
>>     I see that your proposal is going much further than just adding the
>>     availability method, could you also motivate this a bit further?
>> Could you
>>     maybe reference/show some sinks that:
>>     1. are already implemented using FLIP-143
>>     2. that have some code duplication...
>>     3. ...this duplication would be solved by FLIP-171
>>
>>     Best,
>>     Piotrek
>>
>>     [1]
>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
>>     [2]
>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html
>>
>>     śr., 9 cze 2021 o 09:49 Hausmann, Steffen <shau...@amazon.de.invalid>
>>     napisał(a):
>>
>>     > Hi there,
>>     >
>>     > We would like to start a discussion thread on "FLIP-171: Async
>> Sink" [1],
>>     > where we propose to create a common abstraction for destinations
>> that
>>     > support async requests. This abstraction will make it easier to add
>>     > destinations to Flink by implementing a lightweight shim, while it
>> avoids
>>     > maintaining dozens of independent sinks.
>>     >
>>     > Looking forward to your feedback.
>>     >
>>     > Cheers, Steffen
>>     >
>>     > [1]
>>     >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>>     >
>>     >
>>     >
>>     > Amazon Web Services EMEA SARL
>>     > 38 avenue John F. Kennedy, L-1855 Luxembourg
>>     > Sitz der Gesellschaft: L-1855 Luxemburg
>>     > eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>>     >
>>     > Amazon Web Services EMEA SARL, Niederlassung Deutschland
>>     > Marcel-Breuer-Str. 12, D-80807 Muenchen
>>     > Sitz der Zweigniederlassung: Muenchen
>>     > eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
>> 242240,
>>     > USt-ID DE317013094
>>     >
>>     >
>>     >
>>     >
>>
>>
>>
>>
>> Amazon Web Services EMEA SARL
>> 38 avenue John F. Kennedy, L-1855 Luxembourg
>> Sitz der Gesellschaft: L-1855 Luxemburg
>> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>>
>> Amazon Web Services EMEA SARL, Niederlassung Deutschland
>> Marcel-Breuer-Str. 12, D-80807 Muenchen
>> Sitz der Zweigniederlassung: Muenchen
>> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB
>> 242240, USt-ID DE317013094
>>
>>
>>
>>

Reply via email to