Hi Rajini,

Thanks for the clarification. I looked again at the patch and I see what
you're saying now. I was confused because I assumed the request timeout was
being enforced on the requests themselves, but it is more that the request
timeout bounds the attempt to send them in addition to the time to receive
a response, right? So it is possible that we timeout before even getting a
chance to send the OffsetCommit (for example).

I think I'd still prefer timing out quicker by default if possible. The one
case where it might be worthwhile waiting longer is when there are pending
offset commits sent through commitSync() or commitAsync(). But if we're not
actually doing retries or coordinator rediscovery, I'm not sure the
additional time helps that much.

-Jason

On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram <rajinisiva...@gmail.com>
wrote:

> Hi Jason,
>
> Thank you for the review.
>
> During close(), if there is a rebalance and the coordinator has to be
> rediscovered, close terminates without trying to find the coordinator. The
> poll() loop within close terminates if the coordinator is not known (as it
> does now) or if the timeout expires. At the moment, that timeout is a
> hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
> requestTimeout). So even if there are pending commits, the maximum wait
> will be requestTimeout in the final poll() loop of close().
>
> In addition to this, before the poll loop, there is a
> maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout
> and can wait indefinitely. The PR introduces a timeout for this commit
> invoked from close(). The timeout is min(closeTimeout, requestTimeout).
> Hence the maximum timeout of (2 * requestTimeout) for any close. Have I
> missed something?
>
> I had chosen Long.MAX_VALUE as default close timeout to be consistent with
> Producer. But perhaps a lower timeout of 30 seconds is more meaningful for
> Consumer since consumer typically has less to do. Even with (2 *
> requestTimeout), the default would be 20 minutes, which is perhaps too high
> anyway. I will update the KIP.
>
>
> On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Rajini,
> >
> > Thanks for the KIP. I had a quick look at the patch and the impact
> doesn't
> > seem too bad. Just wanted to clarify one point. This is from the KIP:
> >
> > The existing close() method without a timeout will attempt to close the
> > > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit and
> > > leave group requests are timed out after the request timeout, the upper
> > > bound will be approximately 2*request.timeout.ms (around 10 minutes by
> > > default).
> >
> >
> > I don't think this is quite right. There could be one or more pending
> > OffsetCommit requests (sent using commitAsync) that we may have to await.
> > We could also be in the middle of a group rebalance. The other
> complication
> > is what happens in the event of a request timeout. Usually the consumer
> > will rediscover the coordinator. Would we do that as well in close() and
> > retry any failed requests if there is time remaining, or would we just
> fail
> > the remaining requests and return? In any case, it may not be so easy to
> > set an upper bound on the default timeout.
> >
> > With that in mind, I'm wondering whether waiting indefinitely should be
> the
> > default. In the case of the OffsetCommit before closing (when autocommit
> is
> > enabled) or the LeaveGroup, it's more or less OK if these requests fail.
> > Maybe we should consider them best effort (as is currently done) and
> wait a
> > reasonable amount of time (say 30 seconds) for their completion. I'd
> rather
> > have "nice" behavior out of the box and let users who want indefinite
> > blocking use Long.MAX_VALUE themselves. What do you think?
> >
> > Thanks,
> > Jason
> >
> > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <rajinisiva...@gmail.com
> >
> > wrote:
> >
> > > I have added some more detail to the "Proposed Changes" section. Also
> > > created a preliminary PR for the JIRA (
> > > https://github.com/apache/kafka/pull/2285).
> > >
> > > I am using *request.timeout.ms <http://request.timeout.ms>* to bound
> > > individual requests during close (the KIP does not address timeouts in
> > any
> > > other code path) to ensure that *close()* always completes within a
> > bounded
> > > time even when timeout is not specified. This is similar to the
> producer
> > > where requests are aborted after *request.timeout.ms
> > > <http://request.timeout.ms>. *The PR contains unit and integration
> tests
> > > for all the close scenarios I could think of (but there could be more).
> > >
> > >
> > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > > > +1 on this idea as well.
> > > >
> > > > Streams has also added a similar feature itself partly because
> consumer
> > > > does not support it directly (other part of the reason is that like
> > > > brokers, streams also have some exception handling logic which could
> > lead
> > > > to deadlock with careless System.exit). For consumer itself I think
> the
> > > > trickiness lies in the prefetching calls as well as commit / HB
> > requests
> > > > cleanup with the timeout, and I agree with Ewen that it's better to
> be
> > > > merged in the early release cycle than a last minute merge.
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> > rajinisiva...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Thank you for the reviews.
> > > > >
> > > > > @Becket @Ewen, Agree that making all blocking calls have a timeout
> > will
> > > > be
> > > > > trickier and hence the scope of this KIP is limited to close().
> > > > >
> > > > > @Jay Yes, this should definitely go into release notes, will make
> > sure
> > > it
> > > > > is added. I will add some integration tests with broker failures
> for
> > > > > testing the timeout, but they cannot completely eliminate the risk
> > of a
> > > > > hang. Over time, hopefully system tests will help catch most
> issues.
> > > > >
> > > > >
> > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <j...@confluent.io>
> wrote:
> > > > >
> > > > > > I think this is great. Sounds like one implication is that
> existing
> > > > code
> > > > > > that called close() and hit the timeout would now hang
> > indefinitely.
> > > We
> > > > > saw
> > > > > > this kind of thing a lot in automated testing scenarios where
> > people
> > > > > don't
> > > > > > correctly sequence their shutdown of client and server. I think
> > this
> > > is
> > > > > > okay, but might be good to include in the release notes.
> > > > > >
> > > > > > -jay
> > > > > >
> > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> > rsiva...@pivotal.io
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have just created KIP-102 to add a new close method for
> consumers
> > > > with
> > > > > a
> > > > > >
> > > > > >
> > > > > > timeout parameter, making Consumer consistent with Producer:
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Comments and suggestions are welcome.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Thank you...
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Rajini
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Regards,
> > > > >
> > > > > Rajini
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Reply via email to