Thanks for the feedback Piotrek! Ok, happy to use ` getMaxBatchSize`.
> Again, thanks for the explanation. In my proposal the problem would be with > `completedRequest(RequestInfo requestInfo);` that wouldn't easily know if the > previously created request has actually already started or not. > Alternatively we could squash `shouldBlock` and `startedRequest` call, to a > single `boolean maybeStartRequest(RequestInfo)`, which if returns false, we > would call over and over again until it returns true, but that might make the > interface a slightly less flexible for us. All in all +1 for your current > proposal. Ok, let’s keep it separate for now since the complexity is encapsulated in the AsyncSinkWriter. Regards, Hong From: Piotr Nowojski <pnowoj...@apache.org> Date: Monday, 27 June 2022 at 09:27 To: "Teoh, Hong" <lian...@amazon.co.uk> Cc: "dev@flink.apache.org" <dev@flink.apache.org> Subject: RE: [EXTERNAL][DISCUSS] FLIP-242: Introduce configurable CongestionControlStrategy for Async Sink CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi, > No, they are not the same. What I mean by maxInFlightMessages is the number > of messages currently in flight in all the asynchronous requests currently in > flight. `maxBufferedRequests` on the other hand measures how many messages > are in the AsyncSinkWriter’s buffer (or queue). We flush when the number of > buffered messages > ` maxBufferedRequests`. Oh, ok. I see it now. Thanks for the explanation. Anyway, keeping, having a simple `int getMaxBatchSize();` should address this issue? Note that in the current proposal you have `getNextBatchSize();` which is probably not entirely accurate, as the `AsynSinkWriter` can still create a smaller batch then that value returned from `RateLimitingStrategy`? if so, `getMaxBatchSize` would be a better name. > I’m not sure that would work, since the actions that > RateLimitingStrategy.startedRequest() takes are contingent on whether the > request will actually be started (i.e. the result of > RateLimitingStrategy.shouldBlock()).It needs to know if the request has > actually gone ahead or not. > > For example, if we want to follow the current scheme of limiting based on > in-flight-messages, the RateLimitingStrategy > will need to know how many messages are currently inflight. The way to do > that is to increment in-flight-messages once > request has started, and decrement it after request has completed. This is > trivial if we call `startedRequest()` after > `shouldBlock()` (since we know the request must have been started), and is > complicated if we call `startedRequest()` > BEFORE `shouldBlock()` (since the RateLimitingStrategy would not know if we > will go ahead and actually make the > request, so we will somehow need to keep state OR pass the RequestInfo into > `shouldBlock()` (which I understand > is what you are trying to avoid in the first place). Again, thanks for the explanation. In my proposal the problem would be with `completedRequest(RequestInfo requestInfo);` that wouldn't easily know if the previously created request has actually already started or not. Alternatively we could squash `shouldBlock` and `startedRequest` call, to a single `boolean maybeStartRequest(RequestInfo)`, which if returns false, we would call over and over again until it returns true, but that might make the interface a slightly less flexible for us. All in all +1 for your current proposal. Best, Piotrek pon., 27 cze 2022 o 09:37 Teoh, Hong <lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>> napisał(a): Hi Piotr, > If you think having strict limits is important, we could reverse the > `startedRequest` and `shouldBlock` calls I’m not sure that would work, since the actions that RateLimitingStrategy.startedRequest() takes are contingent on whether the request will actually be started (i.e. the result of RateLimitingStrategy.shouldBlock()).It needs to know if the request has actually gone ahead or not. For example, if we want to follow the current scheme of limiting based on in-flight-messages, the RateLimitingStrategy will need to know how many messages are currently inflight. The way to do that is to increment in-flight-messages once request has started, and decrement it after request has completed. This is trivial if we call `startedRequest()` after `shouldBlock()` (since we know the request must have been started), and is complicated if we call `startedRequest()` BEFORE `shouldBlock()` (since the RateLimitingStrategy would not know if we will go ahead and actually make the request, so we will somehow need to keep state OR pass the RequestInfo into `shouldBlock()` (which I understand is what you are trying to avoid in the first place). Is there a reason you would prefer not to pass RequestInfo into `shouldBlock()`? > Isn't `maxInFlightMessages` the same as the currently misnamed > `maxBufferedRequests`? No, they are not the same. What I mean by maxInFlightMessages is the number of messages currently in flight in all the asynchronous requests currently in flight. `maxBufferedRequests` on the other hand measures how many messages are in the AsyncSinkWriter’s buffer (or queue). We flush when the number of buffered messages > ` maxBufferedRequests`. Regards, Hong From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>> Date: Monday, 27 June 2022 at 08:22 To: "Teoh, Hong" <lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>> Cc: "dev@flink.apache.org<mailto:dev@flink.apache.org>" <dev@flink.apache.org<mailto:dev@flink.apache.org>> Subject: RE: [EXTERNAL][DISCUSS] FLIP-242: Introduce configurable CongestionControlStrategy for Async Sink CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Hong, > ## Q1: Do we need to pass in batch of messages / RequestInfo to > `shouldBlock()` If you think having strict limits is important, we could reverse the `startedRequest` and `shouldBlock` calls as I suggested in my last email: 1. AsyncSinkWriter constructs request with batch of messages - This can be triggered by one of 3 conditions: Timer trigger, batch byte size threshold, batch number size threshold. 2. AsyncSinkWriter calls RateLimitingStrategy.startedRequest(RequestInfo) 3. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock() 4. When request completes, AsyncSinkWriter calls RateLimitingStrategy.completedRequest(RequestInfo) > ## Q2: Do we need to expose nextBatchSize or something to affect the > construction of the next batch? > If we go with never exceeding the maxInFlightMessages, we can enter a > situation where there are no in-flight requests, and the next request's batch > size is larger than the maxInFlightMessages. Isn't `maxInFlightMessages` the same as the currently misnamed `maxBufferedRequests`? Or do you maybe mean to limit the size of the next batch, the equivalent of the currently existing `AIMDRateLimitingStrategy#getRateLimit`? But that's a different limit and +1 for keeping it. Best, Piotrek sob., 25 cze 2022 o 21:56 Teoh, Hong <lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>> napisał(a): Hi Piotr, Thanks for the further questions. To answer both your questions, let's consider the different flows for interaction between RateLimitingStrategy and AsyncSinkWriter (IMO it is important since it affects the interface!): One potential interaction flow is: 1. AsyncSinkWriter constructs request with batch of messages - This can be triggered by one of 3 conditions: Timer trigger, batch byte size threshold, batch number size threshold. 2. AsyncSinkWriter calls RateLimitingStrategy.shouldBlock() 3. If request starts, AsyncSinkWriter calls RateLimitingStrategy.startedRequest(RequestInfo) 4. When request completes, AsyncSinkWriter calls RateLimitingStrategy.completedRequest(RequestInfo) ## Q1: Do we need to pass in batch of messages / RequestInfo to `shouldBlock()` My thoughts are that `shouldBlock()` needs to know whether the (currentInFlight + newRequest > maxInFlight), and reject if so. An alternative to avoid passing this info in is to make RateLimitingStrategy only reject the request AFTER the (currentInFlight > maxInFlight). However, I don't like the idea that a sink has the ability to exceed its `maxInFlightMessages`, since it will not be doing what it says on the box. ## Q2: Do we need to expose nextBatchSize or something to affect the construction of the next batch? If we go with never exceeding the maxInFlightMessages, we can enter a situation where there are no in-flight requests, and the next request's batch size is larger than the maxInFlightMessages. 1) We will not update RateLimitingStrategy's maxInFlightMessages, since this will only be called once a request is started / completed 2) We cannot guarantee the next request will have a smaller batch size, since the AsyncSinkWriter makes this construction independently of the maxInFlightMessages current set in RateLimitingStrategy. We can close this gap by exposing `currentMaxBatchSize()` or `nextBatchSize()` in RateLimitingStrategy that will tell AsyncSinkWriter the maximum size of the next batch. > btw, isn't this variable misnamed? (maxBufferedRequests) Yes you are right. It would be more appropriately named maxBufferedMessages or maxBufferedRequestEntries. However, these names are internal, so we can rename it appropriately. Regards, Hong On 24/06/2022, 17:07, "Piotr Nowojski" <pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Hong, Thanks for your clarifications. > We can change it such that AsyncSinkWriter passes a constructed batch of messages to the RateLimitingStrategy.shouldBlock(), and that returns a boolean, so the RateLimitingStrategy can decide the evaluation logic internally. Do we need to pass the constructed batch of messages to `shouldBlock()`? Can we not call `startRequest()` first, then ask whether we should block? We could check `shouldBlock()` either after or before actually starting the request that we marked in `startRequest()` - it seems to me this could be an implementation detail. In either way, the system would behave more or less the same. Does it matter if we block one request later or sooner? > but we also have to also expose "currentInFlightMessageCapacity" from the RateLimitingStrategy as well Do we have to? Currently this is checked against `AsyncSinkWriter#maxBufferedRequests`, can we not keep it like that? And btw, isn't this variable misnamed? It suggests checking the max number of requests (one async request = one batch?), but from the code it looks like it's actually `maxBufferedMessages`? Best, Piotrek pt., 24 cze 2022 o 09:34 Teoh, Hong <lian...@amazon.co.uk<mailto:lian...@amazon.co.uk>> napisał(a): > Hi Piotr, > > Thanks for your feedback! > > > As I understand it, this effort is about replacing hardcoded > `AIMDRateLimitingStrategy` with something more flexible? > > Yes __ > > > > I have one main question about the design. Why are you > trying to split it into three different interfaces? > Can not we have a single public interface `RateLimitingStrategy` > > You're right about the intention being to separate out the (what), (when) > and (how). > > The intention here was to make the separation of concerns clearer, but I > like your idea to reduce the surface area in the interface. > We can change it such that AsyncSinkWriter passes a constructed batch of > messages to the RateLimitingStrategy.shouldBlock(), and that returns a > boolean, so the RateLimitingStrategy can decide the evaluation logic > internally. (That was my original idea too __ ) > > 1. RateLimitingStrategy can update it's internal logic on `completeRequest > ` `startRequest` methods > 2. RateLimitingStrategy can provide a go/no-go decision on `shouldBlock`, > given a List of requests. > > The above works, but we also have to also expose > "currentInFlightMessageCapacity" from the RateLimitingStrategy as well > (since it is important the batch of messages in the proposed request be > constructed < currentInFlightMessageCapacity), otherwise we will end up in > a situation where requests will never be sent. > > An alternative is to give RateLimitingStrategy the power to construct the > size of the batches, I think that would be bloating the responsibility of > the RateLimitingStrategy a little too much. What do you think? > > > Regards, > Hong > > On 23/06/2022, 10:05, "Piotr Nowojski" <pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Hi Hong, > > As I understand it, this effort is about replacing hardcoded > `AIMDRateLimitingStrategy` with something more flexible? +1 for the > general > idea. > > If I read it correctly, there are basically three issues: > 1. (what) `AIMDRateLimitingStrategy` is only able to limit the size of > all > in-flight records across all batches, not the amount of in-flight > batches. > 2. (when) Currently `AsyncSinkWriter` decides whether and when to > scale up > or down. You would like it to be customisable behaviour. > 3. (how) The actual `scaleUp()` and `scaleDown()` behaviours are > hardcoded, > and this could be customised as well. > > Right? Assuming so, I have one main question about the design. Why are > you > trying to split it into three different interfaces? > Can not we have a single public interface `RateLimitingStrategy` > instead of > three that you proposed, that would have methods like: > > `bool shouldRateLimit()` / `bool shouldBlock()` > `void startedRequest(RequestInfo requestInfo)` > `void completedRequest(RequestInfo requestInfo)` > > where `RequestInfo` is a simple POJO similar to > `CongestionControlInfo` > that you suggested > > public class RequestInfo { > int failedMessages; > int batchSize; > long requestStartTime; > } > > I think it would be more flexible and at the same time a simpler public > interface. Also we could provide the same builder that you proposed in > "Example configuring the Congestion Control Strategy using the new > interfaces", > Or am I missing something? > > Best Piotrek > > pon., 20 cze 2022 o 09:52 Teoh, Hong <lian...@amazon.co.uk.invalid> > napisał(a): > > > Hi all, > > > > I would like to open a discussion on FLIP-242: Introduce configurable > > CongestionControlStrategy for Async Sink. > > > > The Async Sink was introduced as part of FLIP-171 [1], and > implements a > > non-configurable congestion control strategy to reduce network > congestion > > when the destination rejects or throttles requests. We want to make > this > > configurable, to allow the sink implementer to decide the desired > > congestion control behaviour given a specific destination. > > > > This is a step towards reducing the barrier to entry to writing new > async > > sinks in Flink! > > > > You can find more details in the FLIP-242 [2]. Looking forward to > your > > feedback! > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-242%3A+Introduce+configurable+CongestionControlStrategy+for+Async+Sink > > > > > > Regards, > > Hong > > > > > > > >