Thanks Rajini, I updated the KIP wiki to clarify the scope of the KIP. To summarize, the current quota has a few caveats: 1. The brokers are only throttling the NEXT request even if the current request is already violating quota. So the broker will always process at least one request from the client. 2. The brokers are not able to know the client id until they fully read the request out of the sockets even if that client is being throttled. 3. The brokers are not communicating to the clients promptly, so the clients have to blindly wait and sometimes times out unnecessarily.
This KIP only tries to address 3 but not 1 and 2 because A) those two issues are sort of orthogonal to 3 and B) the solution to 1 and 2 are much more complicated and worth a separate discussion. I'll wait till tomorrow and start a voting thread if there are further concerns raised about the KIP. Thanks, Jiangjie (Becket) Qin On Thu, Nov 16, 2017 at 11:47 AM, Rajini Sivaram <rajinisiva...@gmail.com> wrote: > Hi Becket, > > The current user quota doesn't solve the problem. But I was thinking that > if we could ensure we don't read more from the network than the quota > allows, we may be able to fix the issue in a different way (throttling all > connections, each for a limited time prior to reading large requests). But > it would be more complex (and even more messy for client-id quotas), so I > can understand why the solution you proposed in the KIP makes sense for the > scenario that you described. > > Regards, > > Rajini > > On Tue, Nov 14, 2017 at 11:30 PM, Becket Qin <becket....@gmail.com> wrote: > > > Hi Rajini, > > > > We are using SSL so we could use user quota. But I am not sure if that > > would solve the problem. The key issue in our case is that each broker > can > > only handle ~300 MB/s of incoming bytes, but the MapReduce job is trying > to > > push 1-2 GB/s, unless we can throttle the clients to 300 MB/s, the broker > > cannot sustain. In order to do that, we need to be able to throttle > > requests for more than request timeout, potentially a few minutes. It > seems > > neither user quota nor limited throttle time can achieve this. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Tue, Nov 14, 2017 at 7:44 AM, Rajini Sivaram <rajinisiva...@gmail.com > > > > wrote: > > > > > Hi Becket, > > > > > > For the specific scenario that you described, would it be possible to > use > > > user quotas rather than client-id quotas? With user quotas, perhaps we > > can > > > throttle more easily before reading requests as well (as you mentioned, > > the > > > difficulty with client-id quota is that we have to read partial > requests > > > and handle client-ids at network layer making that a much bigger > change). > > > If your clients are using SASL/SSL, I was wondering whether a solution > > that > > > improves user quotas and limits throttle time would work for you. > > > > > > Regards, > > > > > > Rajini > > > > > > > > > > > > On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > Since we will bump up the wire request version, another option is for > > > > clients that are sending old request versions the broker can just > keep > > > the > > > > current behavior. For clients sending the new request versions, the > > > broker > > > > can respond then mute the channel as described in the KIP wiki. In > this > > > > case, muting the channel is mostly for protection purpose. A > correctly > > > > implemented client should back off for throttle time before sending > the > > > > next request. The downside is that the broker needs to keep both > logic > > > and > > > > it seems not gaining much benefit. So personally I prefer to just > mute > > > the > > > > channel. But I am open to different opinions. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > Hmm, even if a connection is closed by the client when the channel > is > > > > > muted. After the channel is unmuted, it seems Selector.select() > will > > > > detect > > > > > this and close the socket. > > > > > It is true that before the channel is unmuted the socket will be > in a > > > > > CLOSE_WAIT state though. So having an arbitrarily long muted > duration > > > may > > > > > indeed cause problem. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <becket....@gmail.com> > > > wrote: > > > > > > > > > >> Hi Rajini, > > > > >> > > > > >> Thanks for the detail explanation. Please see the reply below: > > > > >> > > > > >> 2. Limiting the throttle time to connection.max.idle.ms on the > > broker > > > > >> side is probably fine. However, clients may have a different > > > > configuration > > > > >> of connection.max.idle.ms and still reconnect before the throttle > > > time > > > > >> (which is the server side connection.max.idle.ms). It seems > another > > > > back > > > > >> door for quota. > > > > >> > > > > >> 3. I agree we could just mute the server socket until > > > > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue. > > This > > > > >> helps guarantee only connection_rate * connection.max.idle.ms > > sockets > > > > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting the > > > > socket > > > > >> will not have negative impact. > > > > >> > > > > >> 4. My concern for capping the throttle time to metrics.window.ms > is > > > > that > > > > >> we will not be able to enforce quota effectively. It might be > useful > > > to > > > > >> explain this with a real example we are trying to solve. We have a > > > > >> MapReduce job pushing data to a Kafka cluster. The MapReduce job > has > > > > >> hundreds of producers and each of them sends a normal sized > > > > ProduceRequest > > > > >> (~2 MB) to each of the brokers in the cluster. Apparently the > client > > > id > > > > >> will ran out of bytes quota pretty quickly, and the broker started > > to > > > > >> throttle the producers. The throttle time could actually be pretty > > > long > > > > >> (e.g. a few minute). At that point, request queue time on the > > brokers > > > > was > > > > >> around 30 seconds. After that, a bunch of producer hit > > > > request.timeout.ms > > > > >> and reconnected and sent the next request again, which causes > > another > > > > spike > > > > >> and a longer queue. > > > > >> > > > > >> In the above case, unless we set the quota window to be pretty > big, > > we > > > > >> will not be able to enforce the quota. And if we set the window > size > > > to > > > > a > > > > >> large value, the request might be throttled for longer than > > > > >> connection.max.idle.ms. > > > > >> > > > > >> > We need a solution to improve flow control for well-behaved > > clients > > > > >> > which currently rely entirely on broker's throttling. The KIP > > > > addresses > > > > >> > this using co-operative clients that sleep for an unbounded > > throttle > > > > >> time. > > > > >> > I feel this is not ideal since the result is traffic with a lot > of > > > > >> spikes. > > > > >> > Feedback from brokers to enable flow control in the client is a > > good > > > > >> idea, > > > > >> > but clients with excessive throttle times should really have > been > > > > >> > configured with smaller batch sizes. > > > > >> > > > > >> This is not really about a single producer with large size, it is > a > > > lot > > > > >> of small producers talking to the client at the same time. > Reducing > > > the > > > > >> batch size does not help much here. Also note that after the spike > > > > >> traffic at the very beginning, the throttle time of the > > > ProduceRequests > > > > >> processed later are actually going to be increasing (for example, > > the > > > > first > > > > >> throttled request will be throttled for 1 second, the second > > throttled > > > > >> request will be throttled for 10 sec, etc.). Due to the throttle > > time > > > > >> variation, if every producer honors the throttle time, there will > > not > > > > be a > > > > >> next spike after the first produce. > > > > >> > > > > >> > We need a solution to enforce smaller quotas to protect the > broker > > > > >> > from misbehaving clients. The KIP addresses this by muting > > channels > > > > for > > > > >> an > > > > >> > unbounded time. This introduces problems of channels in > > CLOSE_WAIT. > > > > And > > > > >> > doesn't really solve all issues with misbehaving clients since > new > > > > >> > connections can be created to bypass quotas. > > > > >> > > > > >> Our current quota only protects cooperating clients because our > > quota > > > is > > > > >> really throttling the NEXT request after process a request even if > > > this > > > > >> request itself has already violated quota. The misbehaving client > > are > > > > not > > > > >> protected at all with the current quota mechanism. Like you > > > mentioned, a > > > > >> connection quota is required. We have been discussing about this > at > > > > >> LinkedIn for some time. Doing it right requires some major changes > > > such > > > > as > > > > >> partially reading a request to identify the client id at network > > level > > > > and > > > > >> disconnect misbehaving clients. > > > > >> > > > > >> While handling misbehaving clients is important, we are not trying > > to > > > > >> address that in this KIP. This KIP is trying to improve the > > > > communication > > > > >> with good clients. Muting the channel is more of a migration plan > so > > > > that > > > > >> we don't have regression on the old innocent (but good) clients. > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Jiangjie (Becket) Qin > > > > >> > > > > >> > > > > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <j...@confluent.io> wrote: > > > > >> > > > > >>> Hi, Jiangjie, > > > > >>> > > > > >>> 3. If a client closes a socket connection, typically the server > > only > > > > >>> finds > > > > >>> this out by reading from the channel and getting a negative size > > > during > > > > >>> the > > > > >>> read. So, if a channel is muted by the server, the server won't > be > > > able > > > > >>> to > > > > >>> detect the closing of the connection by the client after the > socket > > > is > > > > >>> unmuted. That's probably what Rajini wants to convey. > > > > >>> > > > > >>> Thanks, > > > > >>> > > > > >>> Jun > > > > >>> > > > > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <becket....@gmail.com > > > > > > wrote: > > > > >>> > > > > >>> > Thanks Rajini. > > > > >>> > > > > > >>> > 1. Good point. We do need to bump up the protocol version so > that > > > the > > > > >>> new > > > > >>> > clients do not wait for another throttle time when they are > > talking > > > > to > > > > >>> old > > > > >>> > brokers. I'll update the KIP. > > > > >>> > > > > > >>> > 2. That is true. But the client was not supposed to send > request > > to > > > > the > > > > >>> > broker during that period anyways. So detecting the broker > > failure > > > > >>> later > > > > >>> > seems fine? > > > > >>> > > > > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the > > > current > > > > >>> state? > > > > >>> > Currently the broker will still mute the socket until it sends > > the > > > > >>> response > > > > >>> > back. If the clients disconnect while they are being throttled, > > the > > > > >>> closed > > > > >>> > socket will not be detected until the throttle time has passed. > > > > >>> > > > > > >>> > Jun also suggested to bound the time by > metric.sample.window.ms > > in > > > > the > > > > >>> > ticket. I am not sure about the bound on throttle time. It > seems > > a > > > > >>> little > > > > >>> > difficult to come up with a good bound. If the bound is too > > large, > > > it > > > > >>> does > > > > >>> > not really help solve the various timeout issue we may face. If > > the > > > > >>> bound > > > > >>> > is too low, the quota is essentially not honored. We may > > > potentially > > > > >>> treat > > > > >>> > different requests differently, but that seems too complicated > > and > > > > >>> error > > > > >>> > prone. > > > > >>> > > > > > >>> > IMO, the key improvement we want to make is to tell the clients > > how > > > > >>> long > > > > >>> > they will be throttled so the clients knows what happened so > they > > > can > > > > >>> act > > > > >>> > accordingly instead of waiting naively. Muting the socket on > the > > > > broker > > > > >>> > side is just in case of non-cooperating clients. For the > existing > > > > >>> clients, > > > > >>> > it seems this does not have much impact compare with what we > have > > > > now. > > > > >>> > > > > > >>> > Thanks, > > > > >>> > > > > > >>> > Jiangjie (Becket) Qin > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram < > > > > >>> rajinisiva...@gmail.com> > > > > >>> > wrote: > > > > >>> > > > > > >>> > > Hi Becket, > > > > >>> > > > > > > >>> > > Thank you for the KIP. A few comments: > > > > >>> > > > > > > >>> > > 1.KIP says: "*No public interface changes are needed. We > only > > > > >>> propose > > > > >>> > > behavior change on the broker side.*" > > > > >>> > > > > > > >>> > > But from the proposed changes, it sounds like clients will be > > > > >>> updated to > > > > >>> > > wait for throttle-time before sending next response, and also > > not > > > > >>> handle > > > > >>> > > idle disconnections during that time. Doesn't that mean that > > > > clients > > > > >>> need > > > > >>> > > to know that the broker has sent the response before > > throttling, > > > > >>> > requiring > > > > >>> > > protocol/version change? > > > > >>> > > > > > > >>> > > > > > > >>> > > 2. At the moment, broker failures are detected by clients > (and > > > vice > > > > >>> > versa) > > > > >>> > > within connections.max.idle.ms. By removing this check for > an > > > > >>> unlimited > > > > >>> > > throttle time, failure detection could be delayed. > > > > >>> > > > > > > >>> > > > > > > >>> > > 3. KIP says "*Since this subsequent request is not actually > > > > handled > > > > >>> > until > > > > >>> > > the broker unmutes the channel, the client can hit > > > > >>> request.timeout.ms > > > > >>> > > <http://request.timeout.ms> and reconnect. However, this is > no > > > > worse > > > > >>> > than > > > > >>> > > the current state.*" > > > > >>> > > > > > > >>> > > I think this could be worse than the current state because > > broker > > > > >>> doesn't > > > > >>> > > detect and close the channel for an unlimited throttle time, > > > while > > > > >>> new > > > > >>> > > connections will get accepted. As a result, lot of > connections > > > > could > > > > >>> be > > > > >>> > in > > > > >>> > > CLOSE_WAIT state when throttle time is high. > > > > >>> > > > > > > >>> > > > > > > >>> > > Perhaps it is better to combine this KIP with a bound on > > throttle > > > > >>> time? > > > > >>> > > > > > > >>> > > > > > > >>> > > Regards, > > > > >>> > > > > > > >>> > > > > > > >>> > > Rajini > > > > >>> > > > > > > >>> > > > > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin < > > becket....@gmail.com > > > > > > > > >>> wrote: > > > > >>> > > > > > > >>> > > > Thanks for the comment, Jun, > > > > >>> > > > > > > > >>> > > > 1. Yes, you are right. This could also happen with the > > current > > > > >>> quota > > > > >>> > > > mechanism because we are essentially muting the socket > during > > > > >>> throttle > > > > >>> > > > time. There might be two ways to solve this. > > > > >>> > > > A) use another socket to send heartbeat. > > > > >>> > > > B) let the GroupCoordinator know that the client will not > > > > >>> heartbeat for > > > > >>> > > > some time. > > > > >>> > > > It seems the first solution is cleaner. > > > > >>> > > > > > > > >>> > > > 2. For consumer it seems returning an empty response is a > > > better > > > > >>> > option. > > > > >>> > > In > > > > >>> > > > the producer case, if there is a spike in traffic. The > > brokers > > > > >>> will see > > > > >>> > > > queued up requests, but that is hard to avoid unless we > have > > > > >>> connection > > > > >>> > > > level quota, which is a bigger change and may be easier to > > > > discuss > > > > >>> it > > > > >>> > in > > > > >>> > > a > > > > >>> > > > separate KIP. > > > > >>> > > > > > > > >>> > > > Thanks, > > > > >>> > > > > > > > >>> > > > Jiangjie (Becket) Qin > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <j...@confluent.io > > > > > > wrote: > > > > >>> > > > > > > > >>> > > > > Hi, Jiangjie, > > > > >>> > > > > > > > > >>> > > > > Thanks for bringing this up. A couple of quick thoughts. > > > > >>> > > > > > > > > >>> > > > > 1. If the throttle time is large, what can happen is > that a > > > > >>> consumer > > > > >>> > > > won't > > > > >>> > > > > be able to heart beat to the group coordinator frequent > > > enough. > > > > >>> In > > > > >>> > that > > > > >>> > > > > case, even with this KIP, it seems there could be > frequent > > > > >>> consumer > > > > >>> > > group > > > > >>> > > > > rebalances. > > > > >>> > > > > > > > > >>> > > > > 2. If we return a response immediately, for the consumer, > > do > > > we > > > > >>> > return > > > > >>> > > > the > > > > >>> > > > > requested data or an empty response? If we do the former, > > it > > > > may > > > > >>> not > > > > >>> > > > > protect against the case when there are multiple consumer > > > > >>> instances > > > > >>> > > > > associated with the same user/clientid. > > > > >>> > > > > > > > > >>> > > > > Jun > > > > >>> > > > > > > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin < > > > > becket....@gmail.com > > > > >>> > > > > > >>> > > wrote: > > > > >>> > > > > > > > > >>> > > > > > Hi, > > > > >>> > > > > > > > > > >>> > > > > > We would like to start the discussion on KIP-219. > > > > >>> > > > > > > > > > >>> > > > > > The KIP tries to improve quota throttling time > > > communication > > > > >>> > between > > > > >>> > > > > > brokers and clients to avoid clients timeout in case of > > > long > > > > >>> > > throttling > > > > >>> > > > > > time. > > > > >>> > > > > > > > > > >>> > > > > > The KIP link is following: > > > > >>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > >>> > > > > 219+-+Improve+quota+ > > > > >>> > > > > > communication > > > > >>> > > > > > > > > > >>> > > > > > Comments are welcome. > > > > >>> > > > > > > > > > >>> > > > > > Thanks, > > > > >>> > > > > > > > > > >>> > > > > > Jiangjie (Becket) Qin > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > >