Hi Jason, Yes, we do potentially timeout even before sending pending commits after the request timeout (default is > 5 minutes, so this should only happen when there are real issues or when brokers are shutdown). I have updated the KIP to use a default timeout of 30 seconds for the existing close() method.
Since the code changes are limited to the close() code path, can we include this in 0.10.2.0? If so, I can initiate the vote tomorrow. Thank you... On Wed, Jan 4, 2017 at 5:35 PM, Jason Gustafson <ja...@confluent.io> wrote: > Hi Rajini, > > Thanks for the clarification. I looked again at the patch and I see what > you're saying now. I was confused because I assumed the request timeout was > being enforced on the requests themselves, but it is more that the request > timeout bounds the attempt to send them in addition to the time to receive > a response, right? So it is possible that we timeout before even getting a > chance to send the OffsetCommit (for example). > > I think I'd still prefer timing out quicker by default if possible. The one > case where it might be worthwhile waiting longer is when there are pending > offset commits sent through commitSync() or commitAsync(). But if we're not > actually doing retries or coordinator rediscovery, I'm not sure the > additional time helps that much. > > -Jason > > On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram <rajinisiva...@gmail.com> > wrote: > > > Hi Jason, > > > > Thank you for the review. > > > > During close(), if there is a rebalance and the coordinator has to be > > rediscovered, close terminates without trying to find the coordinator. > The > > poll() loop within close terminates if the coordinator is not known (as > it > > does now) or if the timeout expires. At the moment, that timeout is a > > hard-coded 5 second timeout. The PR changes that to min(closeTimeout, > > requestTimeout). So even if there are pending commits, the maximum wait > > will be requestTimeout in the final poll() loop of close(). > > > > In addition to this, before the poll loop, there is a > > maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout > > and can wait indefinitely. The PR introduces a timeout for this commit > > invoked from close(). The timeout is min(closeTimeout, requestTimeout). > > Hence the maximum timeout of (2 * requestTimeout) for any close. Have I > > missed something? > > > > I had chosen Long.MAX_VALUE as default close timeout to be consistent > with > > Producer. But perhaps a lower timeout of 30 seconds is more meaningful > for > > Consumer since consumer typically has less to do. Even with (2 * > > requestTimeout), the default would be 20 minutes, which is perhaps too > high > > anyway. I will update the KIP. > > > > > > On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Rajini, > > > > > > Thanks for the KIP. I had a quick look at the patch and the impact > > doesn't > > > seem too bad. Just wanted to clarify one point. This is from the KIP: > > > > > > The existing close() method without a timeout will attempt to close the > > > > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit > and > > > > leave group requests are timed out after the request timeout, the > upper > > > > bound will be approximately 2*request.timeout.ms (around 10 minutes > by > > > > default). > > > > > > > > > I don't think this is quite right. There could be one or more pending > > > OffsetCommit requests (sent using commitAsync) that we may have to > await. > > > We could also be in the middle of a group rebalance. The other > > complication > > > is what happens in the event of a request timeout. Usually the consumer > > > will rediscover the coordinator. Would we do that as well in close() > and > > > retry any failed requests if there is time remaining, or would we just > > fail > > > the remaining requests and return? In any case, it may not be so easy > to > > > set an upper bound on the default timeout. > > > > > > With that in mind, I'm wondering whether waiting indefinitely should be > > the > > > default. In the case of the OffsetCommit before closing (when > autocommit > > is > > > enabled) or the LeaveGroup, it's more or less OK if these requests > fail. > > > Maybe we should consider them best effort (as is currently done) and > > wait a > > > reasonable amount of time (say 30 seconds) for their completion. I'd > > rather > > > have "nice" behavior out of the box and let users who want indefinite > > > blocking use Long.MAX_VALUE themselves. What do you think? > > > > > > Thanks, > > > Jason > > > > > > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram < > rajinisiva...@gmail.com > > > > > > wrote: > > > > > > > I have added some more detail to the "Proposed Changes" section. Also > > > > created a preliminary PR for the JIRA ( > > > > https://github.com/apache/kafka/pull/2285). > > > > > > > > I am using *request.timeout.ms <http://request.timeout.ms>* to bound > > > > individual requests during close (the KIP does not address timeouts > in > > > any > > > > other code path) to ensure that *close()* always completes within a > > > bounded > > > > time even when timeout is not specified. This is similar to the > > producer > > > > where requests are aborted after *request.timeout.ms > > > > <http://request.timeout.ms>. *The PR contains unit and integration > > tests > > > > for all the close scenarios I could think of (but there could be > more). > > > > > > > > > > > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > +1 on this idea as well. > > > > > > > > > > Streams has also added a similar feature itself partly because > > consumer > > > > > does not support it directly (other part of the reason is that like > > > > > brokers, streams also have some exception handling logic which > could > > > lead > > > > > to deadlock with careless System.exit). For consumer itself I think > > the > > > > > trickiness lies in the prefetching calls as well as commit / HB > > > requests > > > > > cleanup with the timeout, and I agree with Ewen that it's better to > > be > > > > > merged in the early release cycle than a last minute merge. > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram < > > > rajinisiva...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > Thank you for the reviews. > > > > > > > > > > > > @Becket @Ewen, Agree that making all blocking calls have a > timeout > > > will > > > > > be > > > > > > trickier and hence the scope of this KIP is limited to close(). > > > > > > > > > > > > @Jay Yes, this should definitely go into release notes, will make > > > sure > > > > it > > > > > > is added. I will add some integration tests with broker failures > > for > > > > > > testing the timeout, but they cannot completely eliminate the > risk > > > of a > > > > > > hang. Over time, hopefully system tests will help catch most > > issues. > > > > > > > > > > > > > > > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <j...@confluent.io> > > wrote: > > > > > > > > > > > > > I think this is great. Sounds like one implication is that > > existing > > > > > code > > > > > > > that called close() and hit the timeout would now hang > > > indefinitely. > > > > We > > > > > > saw > > > > > > > this kind of thing a lot in automated testing scenarios where > > > people > > > > > > don't > > > > > > > correctly sequence their shutdown of client and server. I think > > > this > > > > is > > > > > > > okay, but might be good to include in the release notes. > > > > > > > > > > > > > > -jay > > > > > > > > > > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram < > > > rsiva...@pivotal.io > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have just created KIP-102 to add a new close method for > > consumers > > > > > with > > > > > > a > > > > > > > > > > > > > > > > > > > > > timeout parameter, making Consumer consistent with Producer: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > > 102+-+Add+close+with+timeout+for+consumers > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Comments and suggestions are welcome. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Rajini > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Regards, > > > > > > > > > > > > Rajini > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > >