Thanks for the feedback Piotrek!

Ok, happy to use ` getMaxBatchSize`.

> Again, thanks for the explanation. In my proposal the problem would be with 
> `completedRequest(RequestInfo requestInfo);` that wouldn't easily know if the 
> previously created request has actually already started or not.  
> Alternatively we could squash `shouldBlock` and `startedRequest` call, to a 
> single `boolean maybeStartRequest(RequestInfo)`, which if returns false, we 
> would call over and over again until it returns true, but that might make the 
> interface a slightly less flexible for us. All in all +1 for your current 
> proposal.

Ok, let’s keep it separate for now since the complexity is encapsulated in the 
AsyncSinkWriter.

Regards,
Hong


From: Piotr Nowojski <pnowoj...@apache.org>
Date: Monday, 27 June 2022 at 09:27
To: "Teoh, Hong" <lian...@amazon.co.uk>
Cc: "dev@flink.apache.org" <dev@flink.apache.org>
Subject: RE: [EXTERNAL][DISCUSS] FLIP-242: Introduce configurable 
CongestionControlStrategy for Async Sink


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

> No, they are not the same. What I mean by maxInFlightMessages is the number 
> of messages currently in flight in all the asynchronous requests currently in 
> flight. `maxBufferedRequests` on the other hand measures how many messages 
> are in the AsyncSinkWriter’s buffer (or queue). We flush when the number of 
> buffered messages > ` maxBufferedRequests`.

 Oh, ok. I see it now. Thanks for the explanation. Anyway, keeping, having a 
simple `int getMaxBatchSize();` should address this issue? Note that in the 
current proposal you have `getNextBatchSize();` which is probably not entirely 
accurate, as the `AsynSinkWriter` can still create a smaller batch then that 
value returned from `RateLimitingStrategy`? if so, `getMaxBatchSize` would be a 
better name.

> I’m not sure that would work, since the actions that 
> RateLimitingStrategy.startedRequest() takes are contingent on whether the 
> request will actually be started (i.e. the result of 
> RateLimitingStrategy.shouldBlock()).It needs to know if the request has 
> actually gone ahead or not.
>
> For example, if we want to follow the current scheme of limiting based on 
> in-flight-messages, the RateLimitingStrategy
> will need to know how many messages are currently inflight. The way to do 
> that is to increment in-flight-messages once
> request has started, and decrement it after request has completed. This is 
> trivial if we call `startedRequest()` after
> `shouldBlock()` (since we know the request must have been started), and is 
> complicated if we call `startedRequest()`
> BEFORE `shouldBlock()` (since the RateLimitingStrategy would not know if we 
> will go ahead and actually make the
> request, so we will somehow need to keep state OR pass the RequestInfo into 
> `shouldBlock()` (which I understand
> is what you are trying to avoid in the first place).
Again, thanks for the explanation. In my proposal the problem would be with 
`completedRequest(RequestInfo requestInfo);` that wouldn't easily know if the 
previously created request has actually already started or not.  Alternatively 
we could squash `shouldBlock` and `startedRequest` call, to a single `boolean 
maybeStartRequest(RequestInfo)`, which if returns false, we would call over and 
over again until it returns true, but that might make the interface a slightly 
less flexible for us. All in all +1 for your current proposal.

Best,
Piotrek


pon., 27 cze 2022 o 09:37 Teoh, Hong 
<lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>> napisał(a):
Hi Piotr,

> If you think having strict limits is important, we could reverse the 
> `startedRequest` and `shouldBlock` calls

I’m not sure that would work, since the actions that 
RateLimitingStrategy.startedRequest() takes are contingent on whether the 
request will actually be started (i.e. the result of 
RateLimitingStrategy.shouldBlock()).It needs to know if the request has 
actually gone ahead or not.

For example, if we want to follow the current scheme of limiting based on 
in-flight-messages, the RateLimitingStrategy will need to know how many 
messages are currently inflight. The way to do that is to increment 
in-flight-messages once request has started, and decrement it after request has 
completed. This is trivial if we call `startedRequest()` after `shouldBlock()` 
(since we know the request must have been started), and is complicated if we 
call `startedRequest()` BEFORE `shouldBlock()` (since the RateLimitingStrategy 
would not know if we will go ahead and actually make the request, so we will 
somehow need to keep state OR pass the RequestInfo into `shouldBlock()` (which 
I understand is what you are trying to avoid in the first place).

Is there a reason you would prefer not to pass RequestInfo into `shouldBlock()`?

> Isn't `maxInFlightMessages` the same as the currently misnamed 
> `maxBufferedRequests`?

No, they are not the same. What I mean by maxInFlightMessages is the number of 
messages currently in flight in all the asynchronous requests currently in 
flight. `maxBufferedRequests` on the other hand measures how many messages are 
in the AsyncSinkWriter’s buffer (or queue). We flush when the number of 
buffered messages > ` maxBufferedRequests`.


Regards,
Hong

From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>>
Date: Monday, 27 June 2022 at 08:22
To: "Teoh, Hong" <lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>>
Cc: "dev@flink.apache.org<mailto:dev@flink.apache.org>" 
<dev@flink.apache.org<mailto:dev@flink.apache.org>>
Subject: RE: [EXTERNAL][DISCUSS] FLIP-242: Introduce configurable 
CongestionControlStrategy for Async Sink


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Hong,

> ## Q1: Do we need to pass in batch of messages / RequestInfo to 
> `shouldBlock()`

If you think having strict limits is important, we could reverse the 
`startedRequest` and `shouldBlock` calls as I suggested in my last email:

1. AsyncSinkWriter constructs request with batch of messages
  - This can be triggered by one of 3 conditions: Timer trigger, batch byte 
size threshold, batch number size threshold.
2. AsyncSinkWriter calls RateLimitingStrategy.startedRequest(RequestInfo)
3. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock()
4. When request completes, AsyncSinkWriter calls 
RateLimitingStrategy.completedRequest(RequestInfo)

> ## Q2: Do we need to expose nextBatchSize or something to affect the 
> construction of the next batch?
> If we go with never exceeding the maxInFlightMessages, we can enter a 
> situation where there are no in-flight requests, and the next request's batch 
> size is larger than the maxInFlightMessages.

Isn't `maxInFlightMessages` the same as the currently misnamed 
`maxBufferedRequests`? Or do you maybe mean to limit the size of the next 
batch, the equivalent of the currently existing 
`AIMDRateLimitingStrategy#getRateLimit`? But that's a different limit and +1 
for keeping it.

Best,
Piotrek


sob., 25 cze 2022 o 21:56 Teoh, Hong 
<lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>> napisał(a):
Hi Piotr,

Thanks for the further questions.

To answer both your questions, let's consider the different flows for 
interaction between RateLimitingStrategy and AsyncSinkWriter (IMO it is 
important since it affects the interface!):

One potential interaction flow is:
1. AsyncSinkWriter constructs request with batch of messages
  - This can be triggered by one of 3 conditions: Timer trigger, batch byte 
size threshold, batch number size threshold.
2. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock()
3. If request starts, AsyncSinkWriter calls 
RateLimitingStrategy.startedRequest(RequestInfo)
4. When request completes, AsyncSinkWriter calls 
RateLimitingStrategy.completedRequest(RequestInfo)

## Q1: Do we need to pass in batch of messages / RequestInfo to `shouldBlock()`
My thoughts are that `shouldBlock()` needs to know whether the (currentInFlight 
+ newRequest > maxInFlight), and reject if so. An alternative to avoid passing 
this info in is to make RateLimitingStrategy only reject the request AFTER the 
(currentInFlight > maxInFlight).
However, I don't like the idea that a sink has the ability to exceed its 
`maxInFlightMessages`, since it will not be doing what it says on the box.

## Q2: Do we need to expose nextBatchSize or something to affect the 
construction of the next batch?
If we go with never exceeding the maxInFlightMessages, we can enter a situation 
where there are no in-flight requests, and the next request's batch size is 
larger than the maxInFlightMessages.
1) We will not update RateLimitingStrategy's maxInFlightMessages, since this 
will only be called once a request is started / completed
2) We cannot guarantee the next request will have a smaller batch size, since 
the AsyncSinkWriter makes this construction independently of the 
maxInFlightMessages current set in RateLimitingStrategy.

We can close this gap by exposing `currentMaxBatchSize()` or `nextBatchSize()` 
in RateLimitingStrategy that will tell AsyncSinkWriter the maximum size of the 
next batch.

> btw, isn't this variable misnamed? (maxBufferedRequests)

Yes you are right. It would be more appropriately named maxBufferedMessages or 
maxBufferedRequestEntries. However, these names are internal, so we can rename 
it appropriately.

Regards,
Hong



On 24/06/2022, 17:07, "Piotr Nowojski" 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:

    CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



    Hi Hong,

    Thanks for your clarifications.

    > We can change it such that AsyncSinkWriter passes a constructed batch of
    messages to the RateLimitingStrategy.shouldBlock(), and that returns a
    boolean, so the RateLimitingStrategy can decide the evaluation logic
    internally.

    Do we need to pass the constructed batch of messages to `shouldBlock()`?
    Can we not call `startRequest()` first, then ask whether we should block?
    We could check `shouldBlock()` either after or before actually starting the
    request that we marked in `startRequest()` - it seems to me this could be
    an implementation detail. In either way, the system would behave more or
    less the same. Does it matter if we block one request later or sooner?

    > but we also have to also expose "currentInFlightMessageCapacity" from the
    RateLimitingStrategy as well

    Do we have to? Currently this is checked against
    `AsyncSinkWriter#maxBufferedRequests`, can we not keep it like that? And
    btw, isn't this variable misnamed? It suggests checking the max number of
    requests (one async request = one batch?), but from the code it looks like
    it's actually `maxBufferedMessages`?

    Best,
    Piotrek


    pt., 24 cze 2022 o 09:34 Teoh, Hong 
<lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>> napisał(a):

    > Hi Piotr,
    >
    > Thanks for your feedback!
    >
    > > As I understand it, this effort is about replacing hardcoded
    >     `AIMDRateLimitingStrategy` with something more flexible?
    >
    > Yes __
    >
    >
    > > I have one main question about the design. Why are you
    >     trying to split it into three different interfaces?
    >     Can not we have a single public interface `RateLimitingStrategy`
    >
    > You're right about the intention being to separate out the (what), (when)
    > and (how).
    >
    > The intention here was to make the separation of concerns clearer, but I
    > like your idea to reduce the surface area in the interface.
    > We can change it such that AsyncSinkWriter passes a constructed batch of
    > messages to the RateLimitingStrategy.shouldBlock(), and that returns a
    > boolean, so the RateLimitingStrategy can decide the evaluation logic
    > internally. (That was my original idea too __ )
    >
    > 1. RateLimitingStrategy can update it's internal logic on `completeRequest
    > ` `startRequest` methods
    > 2. RateLimitingStrategy can provide a go/no-go decision on `shouldBlock`,
    > given a List of requests.
    >
    > The above works, but we also have to also expose
    > "currentInFlightMessageCapacity" from the RateLimitingStrategy as well
    > (since it is important the batch of messages in the proposed request be
    > constructed < currentInFlightMessageCapacity), otherwise we will end up in
    > a situation where requests will never be sent.
    >
    > An alternative is to give RateLimitingStrategy the power to construct the
    > size of the batches, I think that would be bloating the responsibility of
    > the RateLimitingStrategy a little too much. What do you think?
    >
    >
    > Regards,
    > Hong
    >
    > On 23/06/2022, 10:05, "Piotr Nowojski" 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:
    >
    >     CAUTION: This email originated from outside of the organization. Do
    > not click links or open attachments unless you can confirm the sender and
    > know the content is safe.
    >
    >
    >
    >     Hi Hong,
    >
    >     As I understand it, this effort is about replacing hardcoded
    >     `AIMDRateLimitingStrategy` with something more flexible? +1 for the
    > general
    >     idea.
    >
    >     If I read it correctly, there are basically three issues:
    >     1. (what) `AIMDRateLimitingStrategy` is only able to limit the size of
    > all
    >     in-flight records across all batches, not the amount of in-flight
    > batches.
    >     2. (when) Currently `AsyncSinkWriter` decides whether and when to
    > scale up
    >     or down. You would like it to be customisable behaviour.
    >     3. (how) The actual `scaleUp()` and `scaleDown()` behaviours are
    > hardcoded,
    >     and this could be customised as well.
    >
    >     Right? Assuming so, I have one main question about the design. Why are
    > you
    >     trying to split it into three different interfaces?
    >     Can not we have a single public interface `RateLimitingStrategy`
    > instead of
    >     three that you proposed, that would have methods like:
    >
    >     `bool shouldRateLimit()` / `bool shouldBlock()`
    >     `void startedRequest(RequestInfo requestInfo)`
    >     `void completedRequest(RequestInfo requestInfo)`
    >
    >     where  `RequestInfo` is a simple POJO similar to
    > `CongestionControlInfo`
    >     that you suggested
    >
    >     public class RequestInfo {
    >       int failedMessages;
    >       int batchSize;
    >       long requestStartTime;
    >     }
    >
    >     I think it would be more flexible and at the same time a simpler 
public
    >     interface. Also we could provide the same builder that you proposed in
    >     "Example configuring the Congestion Control Strategy using the new
    >     interfaces",
    >     Or am I missing something?
    >
    >     Best Piotrek
    >
    >     pon., 20 cze 2022 o 09:52 Teoh, Hong <lian...@amazon.co.uk.invalid>
    >     napisał(a):
    >
    >     > Hi all,
    >     >
    >     > I would like to open a discussion on FLIP-242: Introduce 
configurable
    >     > CongestionControlStrategy for Async Sink.
    >     >
    >     > The Async Sink was introduced as part of FLIP-171 [1], and
    > implements a
    >     > non-configurable congestion control strategy to reduce network
    > congestion
    >     > when the destination rejects or throttles requests. We want to make
    > this
    >     > configurable, to allow the sink implementer to decide the desired
    >     > congestion control behaviour given a specific destination.
    >     >
    >     > This is a step towards reducing the barrier to entry to writing new
    > async
    >     > sinks in Flink!
    >     >
    >     > You can find more details in the FLIP-242 [2]. Looking forward to
    > your
    >     > feedback!
    >     >
    >     > [1]
    >     >
    > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
    >     > [2]
    >     >
    > 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+CongestionControlStrategy+for+Async+Sink
    >     >
    >     >
    >     > Regards,
    >     > Hong
    >     >
    >     >
    >     >
    >
    >

Reply via email to