Hi, I have been implementing KIP-219. I discussed the interface changes with Becket Qin and Dong Lin, and we decided to bump up the protocol version of ApiVersionsRequest and ApiVersionsResponse only, instead of bumping up all requests/responses that may be throttled, to indicate clients whether or not brokers perform throttling before sending out responses. We think this is sufficient given that network client exchanges ApiVersionsRequest/Response with each broker when establishing connection to it and thus it can detect the broker's throttling behavior just by examining the ApiVersionsResponse version.
Please respond to this e-mail if you have any questions or concerns. Thanks, Jon Lee On Thu, Apr 5, 2018 at 2:29 PM, Becket Qin <becket....@gmail.com> wrote: > > > On Thu, Nov 16, 2017 at 3:49 PM, Becket Qin <becket....@gmail.com> wrote: > >> Thanks Rajini, >> >> I updated the KIP wiki to clarify the scope of the KIP. To summarize, the >> current quota has a few caveats: >> 1. The brokers are only throttling the NEXT request even if the current >> request is already violating quota. So the broker will always process at >> least one request from the client. >> 2. The brokers are not able to know the client id until they fully read >> the request out of the sockets even if that client is being throttled. >> 3. The brokers are not communicating to the clients promptly, so the >> clients have to blindly wait and sometimes times out unnecessarily. >> >> This KIP only tries to address 3 but not 1 and 2 because A) those two >> issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are much >> more complicated and worth a separate discussion. >> >> I'll wait till tomorrow and start a voting thread if there are further >> concerns raised about the KIP. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <rajinisiva...@gmail.com >> > wrote: >> >>> Hi Becket, >>> >>> The current user quota doesn't solve the problem. But I was thinking that >>> if we could ensure we don't read more from the network than the quota >>> allows, we may be able to fix the issue in a different way (throttling >>> all >>> connections, each for a limited time prior to reading large requests). >>> But >>> it would be more complex (and even more messy for client-id quotas), so I >>> can understand why the solution you proposed in the KIP makes sense for >>> the >>> scenario that you described. >>> >>> Regards, >>> >>> Rajini >>> >>> On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <becket....@gmail.com> >>> wrote: >>> >>> > Hi Rajini, >>> > >>> > We are using SSL so we could use user quota. But I am not sure if that >>> > would solve the problem. The key issue in our case is that each broker >>> can >>> > only handle ~300 MB/s of incoming bytes, but the MapReduce job is >>> trying to >>> > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the >>> broker >>> > cannot sustain. In order to do that, we need to be able to throttle >>> > requests for more than request timeout, potentially a few minutes. It >>> seems >>> > neither user quota nor limited throttle time can achieve this. >>> > >>> > Thanks, >>> > >>> > Jiangjie (Becket) Qin >>> > >>> > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram < >>> rajinisiva...@gmail.com> >>> > wrote: >>> > >>> > > Hi Becket, >>> > > >>> > > For the specific scenario that you described, would it be possible >>> to use >>> > > user quotas rather than client-id quotas? With user quotas, perhaps >>> we >>> > can >>> > > throttle more easily before reading requests as well (as you >>> mentioned, >>> > the >>> > > difficulty with client-id quota is that we have to read partial >>> requests >>> > > and handle client-ids at network layer making that a much bigger >>> change). >>> > > If your clients are using SASL/SSL, I was wondering whether a >>> solution >>> > that >>> > > improves user quotas and limits throttle time would work for you. >>> > > >>> > > Regards, >>> > > >>> > > Rajini >>> > > >>> > > >>> > > >>> > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <becket....@gmail.com> >>> > wrote: >>> > > >>> > > > Since we will bump up the wire request version, another option is >>> for >>> > > > clients that are sending old request versions the broker can just >>> keep >>> > > the >>> > > > current behavior. For clients sending the new request versions, the >>> > > broker >>> > > > can respond then mute the channel as described in the KIP wiki. In >>> this >>> > > > case, muting the channel is mostly for protection purpose. A >>> correctly >>> > > > implemented client should back off for throttle time before >>> sending the >>> > > > next request. The downside is that the broker needs to keep both >>> logic >>> > > and >>> > > > it seems not gaining much benefit. So personally I prefer to just >>> mute >>> > > the >>> > > > channel. But I am open to different opinions. >>> > > > >>> > > > Thanks, >>> > > > >>> > > > Jiangjie (Becket) Qin >>> > > > >>> > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <becket....@gmail.com> >>> > wrote: >>> > > > >>> > > > > Hi Jun, >>> > > > > >>> > > > > Hmm, even if a connection is closed by the client when the >>> channel is >>> > > > > muted. After the channel is unmuted, it seems Selector.select() >>> will >>> > > > detect >>> > > > > this and close the socket. >>> > > > > It is true that before the channel is unmuted the socket will be >>> in a >>> > > > > CLOSE_WAIT state though. So having an arbitrarily long muted >>> duration >>> > > may >>> > > > > indeed cause problem. >>> > > > > >>> > > > > Thanks, >>> > > > > >>> > > > > Jiangjie (Becket) Qin >>> > > > > >>> > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <becket....@gmail.com >>> > >>> > > wrote: >>> > > > > >>> > > > >> Hi Rajini, >>> > > > >> >>> > > > >> Thanks for the detail explanation. Please see the reply below: >>> > > > >> >>> > > > >> 2. Limiting the throttle time to connection.max.idle.ms on the >>> > broker >>> > > > >> side is probably fine. However, clients may have a different >>> > > > configuration >>> > > > >> of connection.max.idle.ms and still reconnect before the >>> throttle >>> > > time >>> > > > >> (which is the server side connection.max.idle.ms). It seems >>> another >>> > > > back >>> > > > >> door for quota. >>> > > > >> >>> > > > >> 3. I agree we could just mute the server socket until >>> > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big >>> issue. >>> > This >>> > > > >> helps guarantee only connection_rate * connection.max.idle.ms >>> > sockets >>> > > > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting >>> the >>> > > > socket >>> > > > >> will not have negative impact. >>> > > > >> >>> > > > >> 4. My concern for capping the throttle time to >>> metrics.window.ms is >>> > > > that >>> > > > >> we will not be able to enforce quota effectively. It might be >>> useful >>> > > to >>> > > > >> explain this with a real example we are trying to solve. We >>> have a >>> > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce >>> job has >>> > > > >> hundreds of producers and each of them sends a normal sized >>> > > > ProduceRequest >>> > > > >> (~2 MB) to each of the brokers in the cluster. Apparently the >>> client >>> > > id >>> > > > >> will ran out of bytes quota pretty quickly, and the broker >>> started >>> > to >>> > > > >> throttle the producers. The throttle time could actually be >>> pretty >>> > > long >>> > > > >> (e.g. a few minute). At that point, request queue time on the >>> > brokers >>> > > > was >>> > > > >> around 30 seconds. After that, a bunch of producer hit >>> > > > request.timeout.ms >>> > > > >> and reconnected and sent the next request again, which causes >>> > another >>> > > > spike >>> > > > >> and a longer queue. >>> > > > >> >>> > > > >> In the above case, unless we set the quota window to be pretty >>> big, >>> > we >>> > > > >> will not be able to enforce the quota. And if we set the window >>> size >>> > > to >>> > > > a >>> > > > >> large value, the request might be throttled for longer than >>> > > > >> connection.max.idle.ms. >>> > > > >> >>> > > > >> > We need a solution to improve flow control for well-behaved >>> > clients >>> > > > >> > which currently rely entirely on broker's throttling. The KIP >>> > > > addresses >>> > > > >> > this using co-operative clients that sleep for an unbounded >>> > throttle >>> > > > >> time. >>> > > > >> > I feel this is not ideal since the result is traffic with a >>> lot of >>> > > > >> spikes. >>> > > > >> > Feedback from brokers to enable flow control in the client is >>> a >>> > good >>> > > > >> idea, >>> > > > >> > but clients with excessive throttle times should really have >>> been >>> > > > >> > configured with smaller batch sizes. >>> > > > >> >>> > > > >> This is not really about a single producer with large size, it >>> is a >>> > > lot >>> > > > >> of small producers talking to the client at the same time. >>> Reducing >>> > > the >>> > > > >> batch size does not help much here. Also note that after the >>> spike >>> > > > >> traffic at the very beginning, the throttle time of the >>> > > ProduceRequests >>> > > > >> processed later are actually going to be increasing (for >>> example, >>> > the >>> > > > first >>> > > > >> throttled request will be throttled for 1 second, the second >>> > throttled >>> > > > >> request will be throttled for 10 sec, etc.). Due to the throttle >>> > time >>> > > > >> variation, if every producer honors the throttle time, there >>> will >>> > not >>> > > > be a >>> > > > >> next spike after the first produce. >>> > > > >> >>> > > > >> > We need a solution to enforce smaller quotas to protect the >>> broker >>> > > > >> > from misbehaving clients. The KIP addresses this by muting >>> > channels >>> > > > for >>> > > > >> an >>> > > > >> > unbounded time. This introduces problems of channels in >>> > CLOSE_WAIT. >>> > > > And >>> > > > >> > doesn't really solve all issues with misbehaving clients >>> since new >>> > > > >> > connections can be created to bypass quotas. >>> > > > >> >>> > > > >> Our current quota only protects cooperating clients because our >>> > quota >>> > > is >>> > > > >> really throttling the NEXT request after process a request even >>> if >>> > > this >>> > > > >> request itself has already violated quota. The misbehaving >>> client >>> > are >>> > > > not >>> > > > >> protected at all with the current quota mechanism. Like you >>> > > mentioned, a >>> > > > >> connection quota is required. We have been discussing about >>> this at >>> > > > >> LinkedIn for some time. Doing it right requires some major >>> changes >>> > > such >>> > > > as >>> > > > >> partially reading a request to identify the client id at network >>> > level >>> > > > and >>> > > > >> disconnect misbehaving clients. >>> > > > >> >>> > > > >> While handling misbehaving clients is important, we are not >>> trying >>> > to >>> > > > >> address that in this KIP. This KIP is trying to improve the >>> > > > communication >>> > > > >> with good clients. Muting the channel is more of a migration >>> plan so >>> > > > that >>> > > > >> we don't have regression on the old innocent (but good) clients. >>> > > > >> >>> > > > >> Thanks, >>> > > > >> >>> > > > >> Jiangjie (Becket) Qin >>> > > > >> >>> > > > >> >>> > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <j...@confluent.io> >>> wrote: >>> > > > >> >>> > > > >>> Hi, Jiangjie, >>> > > > >>> >>> > > > >>> 3. If a client closes a socket connection, typically the server >>> > only >>> > > > >>> finds >>> > > > >>> this out by reading from the channel and getting a negative >>> size >>> > > during >>> > > > >>> the >>> > > > >>> read. So, if a channel is muted by the server, the server >>> won't be >>> > > able >>> > > > >>> to >>> > > > >>> detect the closing of the connection by the client after the >>> socket >>> > > is >>> > > > >>> unmuted. That's probably what Rajini wants to convey. >>> > > > >>> >>> > > > >>> Thanks, >>> > > > >>> >>> > > > >>> Jun >>> > > > >>> >>> > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin < >>> becket....@gmail.com> >>> > > > wrote: >>> > > > >>> >>> > > > >>> > Thanks Rajini. >>> > > > >>> > >>> > > > >>> > 1. Good point. We do need to bump up the protocol version so >>> that >>> > > the >>> > > > >>> new >>> > > > >>> > clients do not wait for another throttle time when they are >>> > talking >>> > > > to >>> > > > >>> old >>> > > > >>> > brokers. I'll update the KIP. >>> > > > >>> > >>> > > > >>> > 2. That is true. But the client was not supposed to send >>> request >>> > to >>> > > > the >>> > > > >>> > broker during that period anyways. So detecting the broker >>> > failure >>> > > > >>> later >>> > > > >>> > seems fine? >>> > > > >>> > >>> > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the >>> > > current >>> > > > >>> state? >>> > > > >>> > Currently the broker will still mute the socket until it >>> sends >>> > the >>> > > > >>> response >>> > > > >>> > back. If the clients disconnect while they are being >>> throttled, >>> > the >>> > > > >>> closed >>> > > > >>> > socket will not be detected until the throttle time has >>> passed. >>> > > > >>> > >>> > > > >>> > Jun also suggested to bound the time by >>> metric.sample.window.ms >>> > in >>> > > > the >>> > > > >>> > ticket. I am not sure about the bound on throttle time. It >>> seems >>> > a >>> > > > >>> little >>> > > > >>> > difficult to come up with a good bound. If the bound is too >>> > large, >>> > > it >>> > > > >>> does >>> > > > >>> > not really help solve the various timeout issue we may face. >>> If >>> > the >>> > > > >>> bound >>> > > > >>> > is too low, the quota is essentially not honored. We may >>> > > potentially >>> > > > >>> treat >>> > > > >>> > different requests differently, but that seems too >>> complicated >>> > and >>> > > > >>> error >>> > > > >>> > prone. >>> > > > >>> > >>> > > > >>> > IMO, the key improvement we want to make is to tell the >>> clients >>> > how >>> > > > >>> long >>> > > > >>> > they will be throttled so the clients knows what happened so >>> they >>> > > can >>> > > > >>> act >>> > > > >>> > accordingly instead of waiting naively. Muting the socket on >>> the >>> > > > broker >>> > > > >>> > side is just in case of non-cooperating clients. For the >>> existing >>> > > > >>> clients, >>> > > > >>> > it seems this does not have much impact compare with what we >>> have >>> > > > now. >>> > > > >>> > >>> > > > >>> > Thanks, >>> > > > >>> > >>> > > > >>> > Jiangjie (Becket) Qin >>> > > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram < >>> > > > >>> rajinisiva...@gmail.com> >>> > > > >>> > wrote: >>> > > > >>> > >>> > > > >>> > > Hi Becket, >>> > > > >>> > > >>> > > > >>> > > Thank you for the KIP. A few comments: >>> > > > >>> > > >>> > > > >>> > > 1.KIP says: "*No public interface changes are needed. We >>> only >>> > > > >>> propose >>> > > > >>> > > behavior change on the broker side.*" >>> > > > >>> > > >>> > > > >>> > > But from the proposed changes, it sounds like clients will >>> be >>> > > > >>> updated to >>> > > > >>> > > wait for throttle-time before sending next response, and >>> also >>> > not >>> > > > >>> handle >>> > > > >>> > > idle disconnections during that time. Doesn't that mean >>> that >>> > > > clients >>> > > > >>> need >>> > > > >>> > > to know that the broker has sent the response before >>> > throttling, >>> > > > >>> > requiring >>> > > > >>> > > protocol/version change? >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > 2. At the moment, broker failures are detected by clients >>> (and >>> > > vice >>> > > > >>> > versa) >>> > > > >>> > > within connections.max.idle.ms. By removing this check >>> for an >>> > > > >>> unlimited >>> > > > >>> > > throttle time, failure detection could be delayed. >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > 3. KIP says "*Since this subsequent request is not >>> actually >>> > > > handled >>> > > > >>> > until >>> > > > >>> > > the broker unmutes the channel, the client can hit >>> > > > >>> request.timeout.ms >>> > > > >>> > > <http://request.timeout.ms> and reconnect. However, this >>> is no >>> > > > worse >>> > > > >>> > than >>> > > > >>> > > the current state.*" >>> > > > >>> > > >>> > > > >>> > > I think this could be worse than the current state because >>> > broker >>> > > > >>> doesn't >>> > > > >>> > > detect and close the channel for an unlimited throttle >>> time, >>> > > while >>> > > > >>> new >>> > > > >>> > > connections will get accepted. As a result, lot of >>> connections >>> > > > could >>> > > > >>> be >>> > > > >>> > in >>> > > > >>> > > CLOSE_WAIT state when throttle time is high. >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > Perhaps it is better to combine this KIP with a bound on >>> > throttle >>> > > > >>> time? >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > Regards, >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > Rajini >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin < >>> > becket....@gmail.com >>> > > > >>> > > > >>> wrote: >>> > > > >>> > > >>> > > > >>> > > > Thanks for the comment, Jun, >>> > > > >>> > > > >>> > > > >>> > > > 1. Yes, you are right. This could also happen with the >>> > current >>> > > > >>> quota >>> > > > >>> > > > mechanism because we are essentially muting the socket >>> during >>> > > > >>> throttle >>> > > > >>> > > > time. There might be two ways to solve this. >>> > > > >>> > > > A) use another socket to send heartbeat. >>> > > > >>> > > > B) let the GroupCoordinator know that the client will not >>> > > > >>> heartbeat for >>> > > > >>> > > > some time. >>> > > > >>> > > > It seems the first solution is cleaner. >>> > > > >>> > > > >>> > > > >>> > > > 2. For consumer it seems returning an empty response is a >>> > > better >>> > > > >>> > option. >>> > > > >>> > > In >>> > > > >>> > > > the producer case, if there is a spike in traffic. The >>> > brokers >>> > > > >>> will see >>> > > > >>> > > > queued up requests, but that is hard to avoid unless we >>> have >>> > > > >>> connection >>> > > > >>> > > > level quota, which is a bigger change and may be easier >>> to >>> > > > discuss >>> > > > >>> it >>> > > > >>> > in >>> > > > >>> > > a >>> > > > >>> > > > separate KIP. >>> > > > >>> > > > >>> > > > >>> > > > Thanks, >>> > > > >>> > > > >>> > > > >>> > > > Jiangjie (Becket) Qin >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao < >>> j...@confluent.io> >>> > > > wrote: >>> > > > >>> > > > >>> > > > >>> > > > > Hi, Jiangjie, >>> > > > >>> > > > > >>> > > > >>> > > > > Thanks for bringing this up. A couple of quick >>> thoughts. >>> > > > >>> > > > > >>> > > > >>> > > > > 1. If the throttle time is large, what can happen is >>> that a >>> > > > >>> consumer >>> > > > >>> > > > won't >>> > > > >>> > > > > be able to heart beat to the group coordinator frequent >>> > > enough. >>> > > > >>> In >>> > > > >>> > that >>> > > > >>> > > > > case, even with this KIP, it seems there could be >>> frequent >>> > > > >>> consumer >>> > > > >>> > > group >>> > > > >>> > > > > rebalances. >>> > > > >>> > > > > >>> > > > >>> > > > > 2. If we return a response immediately, for the >>> consumer, >>> > do >>> > > we >>> > > > >>> > return >>> > > > >>> > > > the >>> > > > >>> > > > > requested data or an empty response? If we do the >>> former, >>> > it >>> > > > may >>> > > > >>> not >>> > > > >>> > > > > protect against the case when there are multiple >>> consumer >>> > > > >>> instances >>> > > > >>> > > > > associated with the same user/clientid. >>> > > > >>> > > > > >>> > > > >>> > > > > Jun >>> > > > >>> > > > > >>> > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin < >>> > > > becket....@gmail.com >>> > > > >>> > >>> > > > >>> > > wrote: >>> > > > >>> > > > > >>> > > > >>> > > > > > Hi, >>> > > > >>> > > > > > >>> > > > >>> > > > > > We would like to start the discussion on KIP-219. >>> > > > >>> > > > > > >>> > > > >>> > > > > > The KIP tries to improve quota throttling time >>> > > communication >>> > > > >>> > between >>> > > > >>> > > > > > brokers and clients to avoid clients timeout in case >>> of >>> > > long >>> > > > >>> > > throttling >>> > > > >>> > > > > > time. >>> > > > >>> > > > > > >>> > > > >>> > > > > > The KIP link is following: >>> > > > >>> > > > > > https://cwiki.apache.org/confl >>> uence/display/KAFKA/KIP- >>> > > > >>> > > > > 219+-+Improve+quota+ >>> > > > >>> > > > > > communication >>> > > > >>> > > > > > >>> > > > >>> > > > > > Comments are welcome. >>> > > > >>> > > > > > >>> > > > >>> > > > > > Thanks, >>> > > > >>> > > > > > >>> > > > >>> > > > > > Jiangjie (Becket) Qin >>> > > > >>> > > > > > >>> > > > >>> > > > > >>> > > > >>> > > > >>> > > > >>> > > >>> > > > >>> > >>> > > > >>> >>> > > > >> >>> > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >