Hi Becket, For the specific scenario that you described, would it be possible to use user quotas rather than client-id quotas? With user quotas, perhaps we can throttle more easily before reading requests as well (as you mentioned, the difficulty with client-id quota is that we have to read partial requests and handle client-ids at network layer making that a much bigger change). If your clients are using SASL/SSL, I was wondering whether a solution that improves user quotas and limits throttle time would work for you.
Regards, Rajini On Thu, Nov 9, 2017 at 12:59 AM, Becket Qin <becket....@gmail.com> wrote: > Since we will bump up the wire request version, another option is for > clients that are sending old request versions the broker can just keep the > current behavior. For clients sending the new request versions, the broker > can respond then mute the channel as described in the KIP wiki. In this > case, muting the channel is mostly for protection purpose. A correctly > implemented client should back off for throttle time before sending the > next request. The downside is that the broker needs to keep both logic and > it seems not gaining much benefit. So personally I prefer to just mute the > channel. But I am open to different opinions. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Nov 6, 2017 at 7:28 PM, Becket Qin <becket....@gmail.com> wrote: > > > Hi Jun, > > > > Hmm, even if a connection is closed by the client when the channel is > > muted. After the channel is unmuted, it seems Selector.select() will > detect > > this and close the socket. > > It is true that before the channel is unmuted the socket will be in a > > CLOSE_WAIT state though. So having an arbitrarily long muted duration may > > indeed cause problem. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Mon, Nov 6, 2017 at 7:22 PM, Becket Qin <becket....@gmail.com> wrote: > > > >> Hi Rajini, > >> > >> Thanks for the detail explanation. Please see the reply below: > >> > >> 2. Limiting the throttle time to connection.max.idle.ms on the broker > >> side is probably fine. However, clients may have a different > configuration > >> of connection.max.idle.ms and still reconnect before the throttle time > >> (which is the server side connection.max.idle.ms). It seems another > back > >> door for quota. > >> > >> 3. I agree we could just mute the server socket until > >> connection.max.idle.ms if the massive CLOSE_WAIT is a big issue. This > >> helps guarantee only connection_rate * connection.max.idle.ms sockets > >> will be in CLOSE_WAIT state. For cooperative clients, unmuting the > socket > >> will not have negative impact. > >> > >> 4. My concern for capping the throttle time to metrics.window.ms is > that > >> we will not be able to enforce quota effectively. It might be useful to > >> explain this with a real example we are trying to solve. We have a > >> MapReduce job pushing data to a Kafka cluster. The MapReduce job has > >> hundreds of producers and each of them sends a normal sized > ProduceRequest > >> (~2 MB) to each of the brokers in the cluster. Apparently the client id > >> will ran out of bytes quota pretty quickly, and the broker started to > >> throttle the producers. The throttle time could actually be pretty long > >> (e.g. a few minute). At that point, request queue time on the brokers > was > >> around 30 seconds. After that, a bunch of producer hit > request.timeout.ms > >> and reconnected and sent the next request again, which causes another > spike > >> and a longer queue. > >> > >> In the above case, unless we set the quota window to be pretty big, we > >> will not be able to enforce the quota. And if we set the window size to > a > >> large value, the request might be throttled for longer than > >> connection.max.idle.ms. > >> > >> > We need a solution to improve flow control for well-behaved clients > >> > which currently rely entirely on broker's throttling. The KIP > addresses > >> > this using co-operative clients that sleep for an unbounded throttle > >> time. > >> > I feel this is not ideal since the result is traffic with a lot of > >> spikes. > >> > Feedback from brokers to enable flow control in the client is a good > >> idea, > >> > but clients with excessive throttle times should really have been > >> > configured with smaller batch sizes. > >> > >> This is not really about a single producer with large size, it is a lot > >> of small producers talking to the client at the same time. Reducing the > >> batch size does not help much here. Also note that after the spike > >> traffic at the very beginning, the throttle time of the ProduceRequests > >> processed later are actually going to be increasing (for example, the > first > >> throttled request will be throttled for 1 second, the second throttled > >> request will be throttled for 10 sec, etc.). Due to the throttle time > >> variation, if every producer honors the throttle time, there will not > be a > >> next spike after the first produce. > >> > >> > We need a solution to enforce smaller quotas to protect the broker > >> > from misbehaving clients. The KIP addresses this by muting channels > for > >> an > >> > unbounded time. This introduces problems of channels in CLOSE_WAIT. > And > >> > doesn't really solve all issues with misbehaving clients since new > >> > connections can be created to bypass quotas. > >> > >> Our current quota only protects cooperating clients because our quota is > >> really throttling the NEXT request after process a request even if this > >> request itself has already violated quota. The misbehaving client are > not > >> protected at all with the current quota mechanism. Like you mentioned, a > >> connection quota is required. We have been discussing about this at > >> LinkedIn for some time. Doing it right requires some major changes such > as > >> partially reading a request to identify the client id at network level > and > >> disconnect misbehaving clients. > >> > >> While handling misbehaving clients is important, we are not trying to > >> address that in this KIP. This KIP is trying to improve the > communication > >> with good clients. Muting the channel is more of a migration plan so > that > >> we don't have regression on the old innocent (but good) clients. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> > >> On Mon, Nov 6, 2017 at 1:33 PM, Jun Rao <j...@confluent.io> wrote: > >> > >>> Hi, Jiangjie, > >>> > >>> 3. If a client closes a socket connection, typically the server only > >>> finds > >>> this out by reading from the channel and getting a negative size during > >>> the > >>> read. So, if a channel is muted by the server, the server won't be able > >>> to > >>> detect the closing of the connection by the client after the socket is > >>> unmuted. That's probably what Rajini wants to convey. > >>> > >>> Thanks, > >>> > >>> Jun > >>> > >>> On Fri, Nov 3, 2017 at 8:11 PM, Becket Qin <becket....@gmail.com> > wrote: > >>> > >>> > Thanks Rajini. > >>> > > >>> > 1. Good point. We do need to bump up the protocol version so that the > >>> new > >>> > clients do not wait for another throttle time when they are talking > to > >>> old > >>> > brokers. I'll update the KIP. > >>> > > >>> > 2. That is true. But the client was not supposed to send request to > the > >>> > broker during that period anyways. So detecting the broker failure > >>> later > >>> > seems fine? > >>> > > >>> > 3. Wouldn't the CLOSE_WAIT handler number be the same as the current > >>> state? > >>> > Currently the broker will still mute the socket until it sends the > >>> response > >>> > back. If the clients disconnect while they are being throttled, the > >>> closed > >>> > socket will not be detected until the throttle time has passed. > >>> > > >>> > Jun also suggested to bound the time by metric.sample.window.ms in > the > >>> > ticket. I am not sure about the bound on throttle time. It seems a > >>> little > >>> > difficult to come up with a good bound. If the bound is too large, it > >>> does > >>> > not really help solve the various timeout issue we may face. If the > >>> bound > >>> > is too low, the quota is essentially not honored. We may potentially > >>> treat > >>> > different requests differently, but that seems too complicated and > >>> error > >>> > prone. > >>> > > >>> > IMO, the key improvement we want to make is to tell the clients how > >>> long > >>> > they will be throttled so the clients knows what happened so they can > >>> act > >>> > accordingly instead of waiting naively. Muting the socket on the > broker > >>> > side is just in case of non-cooperating clients. For the existing > >>> clients, > >>> > it seems this does not have much impact compare with what we have > now. > >>> > > >>> > Thanks, > >>> > > >>> > Jiangjie (Becket) Qin > >>> > > >>> > > >>> > > >>> > On Fri, Nov 3, 2017 at 3:09 PM, Rajini Sivaram < > >>> rajinisiva...@gmail.com> > >>> > wrote: > >>> > > >>> > > Hi Becket, > >>> > > > >>> > > Thank you for the KIP. A few comments: > >>> > > > >>> > > 1.KIP says: "*No public interface changes are needed. We only > >>> propose > >>> > > behavior change on the broker side.*" > >>> > > > >>> > > But from the proposed changes, it sounds like clients will be > >>> updated to > >>> > > wait for throttle-time before sending next response, and also not > >>> handle > >>> > > idle disconnections during that time. Doesn't that mean that > clients > >>> need > >>> > > to know that the broker has sent the response before throttling, > >>> > requiring > >>> > > protocol/version change? > >>> > > > >>> > > > >>> > > 2. At the moment, broker failures are detected by clients (and vice > >>> > versa) > >>> > > within connections.max.idle.ms. By removing this check for an > >>> unlimited > >>> > > throttle time, failure detection could be delayed. > >>> > > > >>> > > > >>> > > 3. KIP says "*Since this subsequent request is not actually > handled > >>> > until > >>> > > the broker unmutes the channel, the client can hit > >>> request.timeout.ms > >>> > > <http://request.timeout.ms> and reconnect. However, this is no > worse > >>> > than > >>> > > the current state.*" > >>> > > > >>> > > I think this could be worse than the current state because broker > >>> doesn't > >>> > > detect and close the channel for an unlimited throttle time, while > >>> new > >>> > > connections will get accepted. As a result, lot of connections > could > >>> be > >>> > in > >>> > > CLOSE_WAIT state when throttle time is high. > >>> > > > >>> > > > >>> > > Perhaps it is better to combine this KIP with a bound on throttle > >>> time? > >>> > > > >>> > > > >>> > > Regards, > >>> > > > >>> > > > >>> > > Rajini > >>> > > > >>> > > > >>> > > On Fri, Nov 3, 2017 at 8:09 PM, Becket Qin <becket....@gmail.com> > >>> wrote: > >>> > > > >>> > > > Thanks for the comment, Jun, > >>> > > > > >>> > > > 1. Yes, you are right. This could also happen with the current > >>> quota > >>> > > > mechanism because we are essentially muting the socket during > >>> throttle > >>> > > > time. There might be two ways to solve this. > >>> > > > A) use another socket to send heartbeat. > >>> > > > B) let the GroupCoordinator know that the client will not > >>> heartbeat for > >>> > > > some time. > >>> > > > It seems the first solution is cleaner. > >>> > > > > >>> > > > 2. For consumer it seems returning an empty response is a better > >>> > option. > >>> > > In > >>> > > > the producer case, if there is a spike in traffic. The brokers > >>> will see > >>> > > > queued up requests, but that is hard to avoid unless we have > >>> connection > >>> > > > level quota, which is a bigger change and may be easier to > discuss > >>> it > >>> > in > >>> > > a > >>> > > > separate KIP. > >>> > > > > >>> > > > Thanks, > >>> > > > > >>> > > > Jiangjie (Becket) Qin > >>> > > > > >>> > > > > >>> > > > On Fri, Nov 3, 2017 at 10:28 AM, Jun Rao <j...@confluent.io> > wrote: > >>> > > > > >>> > > > > Hi, Jiangjie, > >>> > > > > > >>> > > > > Thanks for bringing this up. A couple of quick thoughts. > >>> > > > > > >>> > > > > 1. If the throttle time is large, what can happen is that a > >>> consumer > >>> > > > won't > >>> > > > > be able to heart beat to the group coordinator frequent enough. > >>> In > >>> > that > >>> > > > > case, even with this KIP, it seems there could be frequent > >>> consumer > >>> > > group > >>> > > > > rebalances. > >>> > > > > > >>> > > > > 2. If we return a response immediately, for the consumer, do we > >>> > return > >>> > > > the > >>> > > > > requested data or an empty response? If we do the former, it > may > >>> not > >>> > > > > protect against the case when there are multiple consumer > >>> instances > >>> > > > > associated with the same user/clientid. > >>> > > > > > >>> > > > > Jun > >>> > > > > > >>> > > > > On Wed, Nov 1, 2017 at 9:53 AM, Becket Qin < > becket....@gmail.com > >>> > > >>> > > wrote: > >>> > > > > > >>> > > > > > Hi, > >>> > > > > > > >>> > > > > > We would like to start the discussion on KIP-219. > >>> > > > > > > >>> > > > > > The KIP tries to improve quota throttling time communication > >>> > between > >>> > > > > > brokers and clients to avoid clients timeout in case of long > >>> > > throttling > >>> > > > > > time. > >>> > > > > > > >>> > > > > > The KIP link is following: > >>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>> > > > > 219+-+Improve+quota+ > >>> > > > > > communication > >>> > > > > > > >>> > > > > > Comments are welcome. > >>> > > > > > > >>> > > > > > Thanks, > >>> > > > > > > >>> > > > > > Jiangjie (Becket) Qin > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> > >> > > >