So the alternative to consider would be to instead have
   max.block.ms (or something)
   request.timeout
   replication.timeout

I think this better captures what the user cares about. Here is how it
would work.

*max.send.block.ms <http://max.send.block.ms>* is the bound on the maximum
time the producer.send() call can block.
This subsumes the existing metadata timeout use case but not the proposed
use for the time in the accumulator. It *also* acts as a bound on the time
you can block on BufferPool allocation (we'd have to add this but that
should be easy).

*request.timeout* is the bound on the time after send() complete until you
get an acknowledgement. This covers the connection timeout, and the time in
the accumulator. So to implement this, the time we set in the request sent
via NetworkClient would have already subtracted off the time spent in the
accumulator, and if the request retried we would include both the time in
the accumulator an the time taken for the first request, etc. In other
words this is the upper bound on the time to the Future being satisfied.

*replication.timeout* will default to something reasonable but maybe you
can override it if you want?

Thoughts?

-Jay

On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> So what I understand is that, we would have 3 time outs :
> 1) replication timeout
> 2) request timeout
> 3) metadata timeout (existing)
>
> The request timeout has to be greater than the replication timeout.
> request timeout is for messages already sent to kafka and the producer is
> waiting for them.
>
> Thanks,
>
> Mayuresh
>
> On Tue, May 19, 2015 at 11:12 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > I think this looks good. What I think is missing is an overview of the
> > timeouts from the user's perspective.
> >
> > My worry is that it is quite complicated to reason about the current set
> of
> > timeouts. Currently we have
> >    timeout.ms
> >    metadata.fetch.timeout.ms
> >
> > The proposed settings I think are:
> >   batch.expiration.ms
> > request.timeout.ms
> > replication.timeout.ms
> >
> > I think maybe we can skip the batch.expiration.ms. Instead maybe we can
> > somehow combine these into a single request timeout so that we subtract
> the
> > time you spent waiting from the request timeout and/or replication
> timeout
> > somehow? I don't have an explicit proposal but my suspicion is that from
> > the user's point of view there is just one timeout related to the request
> > after which they don't care, and we can split that up between the batch
> > time and the request time. Thoughts?
> >
> > How are we handling connection timeouts? If a machine hard fails in the
> > middle of connection establishment there will be no outstanding
> requests. I
> > think this may be okay because connections are established when we want
> to
> > send a request and presumably we will begin the timer then?
> >
> > To that end I suggest we do two things:
> > 1. Include KAKFA-1788. I know that technically these two things are
> > different but from the user's point of view they aren't.
> > 2. Include in the KIP the explanation to the user of the full set of
> > timeouts, what they mean, how we will default them, and when to override
> > which.
> >
> > I know this is a hassle but I think the end experience will be a lot
> better
> > if we go through this thought process.
> >
> > -Jay
> >
> > On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin <j...@linkedin.com.invalid
> >
> > wrote:
> >
> > > I modified the WIKI page to incorporate the feedbacks from mailing list
> > > and KIP hangout.
> > >
> > > - Added the deprecation plan for TIMEOUT_CONFIG
> > > - Added the actions to take after request timeout
> > >
> > > I finally chose to create a new connection if requests timeout. The
> > reason
> > > is:
> > > 1. In most cases, if a broker is just slow, as long as we set request
> > > timeout to be a reasonable value, we should not see many new
> connections
> > > get created.
> > > 2. If a broker is down, hopefully metadata refresh will find the new
> > > broker and we will not try to reconnect to the broker anymore.
> > >
> > > Comments are welcome!
> > >
> > > Thanks.
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On 5/12/15, 2:59 PM, "Mayuresh Gharat" <gharatmayures...@gmail.com>
> > wrote:
> > >
> > > >+1 Becket. That would give enough time for clients to move. We should
> > make
> > > >this change very clear.
> > > >
> > > >Thanks,
> > > >
> > > >Mayuresh
> > > >
> > > >On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > >wrote:
> > > >
> > > >> Hey Ewen,
> > > >>
> > > >> Very good summary about the compatibility. What you proposed makes
> > > >>sense.
> > > >> So basically we can do the following:
> > > >>
> > > >> In next release, i.e. 0.8.3:
> > > >> 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
> > > >> 2. Mark TIMEOUT_CONFIG as deprecated
> > > >> 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
> > > >> defined and give a warning about deprecation.
> > > >> In the release after 0.8.3, we remove TIMEOUT_CONFIG.
> > > >>
> > > >> This should give enough buffer for this change.
> > > >>
> > > >> Request timeout is a complete new thing we add to fix a bug, I’m
> with
> > > >>you
> > > >> it does not make sense to have it maintain the old buggy behavior.
> So
> > we
> > > >> can set it to a reasonable value instead of infinite.
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >> On 5/12/15, 12:03 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
> > > wrote:
> > > >>
> > > >> >I think my confusion is coming from this:
> > > >> >
> > > >> >> So in this KIP, we only address (3). The only public interface
> > change
> > > >> >>is a
> > > >> >> new configuration of request timeout (and maybe change the
> > > >>configuration
> > > >> >> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
> > > >> >
> > > >> >There are 3 possible compatibility issues I see here:
> > > >> >
> > > >> >* I assumed this meant the constants also change, so "timeout.ms"
> > > >>becomes
> > > >> >"
> > > >> >replication.timeout.ms". This breaks config files that worked on
> the
> > > >> >previous version and the only warning would be in release notes. We
> > do
> > > >> >warn
> > > >> >about unused configs so they might notice the problem.
> > > >> >
> > > >> >* Binary and source compatibility if someone configures their
> client
> > in
> > > >> >code and uses the TIMEOUT_CONFIG variable. Renaming it will cause
> > > >>existing
> > > >> >jars to break if you try to run against an updated client (which
> > seems
> > > >>not
> > > >> >very significant since I doubt people upgrade these without
> > recompiling
> > > >> >but
> > > >> >maybe I'm wrong about that). And it breaks builds without have
> > > >>deprecated
> > > >> >that field first, which again, is probably not the biggest issue
> but
> > is
> > > >> >annoying for users and when we accidentally changed the API we
> > > >>received a
> > > >> >complaint about breaking builds.
> > > >> >
> > > >> >* Behavior compatibility as Jay mentioned on the call -- setting
> the
> > > >> >config
> > > >> >(even if the name changed) doesn't have the same effect it used to.
> > > >> >
> > > >> >One solution, which admittedly is more painful to implement and
> > > >>maintain,
> > > >> >would be to maintain the timeout.ms config, have it override the
> > > others
> > > >> if
> > > >> >it is specified (including an infinite request timeout I guess?),
> and
> > > >>if
> > > >> >it
> > > >> >isn't specified, we can just use the new config variables. Given a
> > real
> > > >> >deprecation schedule, users would have better warning of changes
> and
> > a
> > > >> >window to make the changes.
> > > >> >
> > > >> >I actually think it might not be necessary to maintain the old
> > behavior
> > > >> >precisely, although maybe for some code it is an issue if they
> start
> > > >> >seeing
> > > >> >timeout exceptions that they wouldn't have seen before?
> > > >> >
> > > >> >-Ewen
> > > >> >
> > > >> >On Wed, May 6, 2015 at 6:06 PM, Jun Rao <j...@confluent.io> wrote:
> > > >> >
> > > >> >> Jiangjie,
> > > >> >>
> > > >> >> Yes, I think using metadata timeout to expire batches in the
> record
> > > >> >> accumulator makes sense.
> > > >> >>
> > > >> >> Thanks,
> > > >> >>
> > > >> >> Jun
> > > >> >>
> > > >> >> On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin
> > > >> >><j...@linkedin.com.invalid>
> > > >> >> wrote:
> > > >> >>
> > > >> >> > I incorporated Ewen and Guozhang’s comments in the KIP page.
> Want
> > > >>to
> > > >> >> speed
> > > >> >> > up on this KIP because currently we experience mirror-maker
> hung
> > > >>very
> > > >> >> > likely when a broker is down.
> > > >> >> >
> > > >> >> > I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used
> > > >>metadata
> > > >> >> > timeout to expire the batches which are sitting in accumulator
> > > >>without
> > > >> >> > leader info. I did that because the situation there is
> > essentially
> > > >> >> missing
> > > >> >> > metadata.
> > > >> >> >
> > > >> >> > As a summary of what I am thinking about the timeout in new
> > > >>Producer:
> > > >> >> >
> > > >> >> > 1. Metadata timeout:
> > > >> >> >   - used in send(), blocking
> > > >> >> >   - used in accumulator to expire batches with timeout
> exception.
> > > >> >> > 2. Linger.ms
> > > >> >> >   - Used in accumulator to ready the batch for drain
> > > >> >> > 3. Request timeout
> > > >> >> >   - Used in NetworkClient to expire a batch and retry if no
> > > >>response
> > > >> >>is
> > > >> >> > received for a request before timeout.
> > > >> >> >
> > > >> >> > So in this KIP, we only address (3). The only public interface
> > > >>change
> > > >> >>is
> > > >> >> a
> > > >> >> > new configuration of request timeout (and maybe change the
> > > >> >>configuration
> > > >> >> > name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
> > > >> >> >
> > > >> >> > Would like to see what people think of above approach?
> > > >> >> >
> > > >> >> > Jiangjie (Becket) Qin
> > > >> >> >
> > > >> >> > On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote:
> > > >> >> >
> > > >> >> > >Jun,
> > > >> >> > >
> > > >> >> > >I thought a little bit differently on this.
> > > >> >> > >Intuitively, I am thinking that if a partition is offline, the
> > > >> >>metadata
> > > >> >> > >for that partition should be considered not ready because we
> > don’t
> > > >> >>know
> > > >> >> > >which broker we should send the message to. So those sends
> need
> > > >>to be
> > > >> >> > >blocked on metadata timeout.
> > > >> >> > >Another thing I’m wondering is in which scenario an offline
> > > >>partition
> > > >> >> will
> > > >> >> > >become online again in a short period of time and how likely
> it
> > > >>will
> > > >> >> > >occur. My understanding is that the batch timeout for batches
> > > >> >>sitting in
> > > >> >> > >accumulator should be larger than linger.ms but should not be
> > too
> > > >> >>long
> > > >> >> > >(e.g. less than 60 seconds). Otherwise it will exhaust the
> > shared
> > > >> >>buffer
> > > >> >> > >with batches to be aborted.
> > > >> >> > >
> > > >> >> > >That said, I do agree it is reasonable to buffer the message
> for
> > > >>some
> > > >> >> time
> > > >> >> > >so messages to other partitions can still get sent. But adding
> > > >> >>another
> > > >> >> > >expiration in addition to linger.ms - which is essentially a
> > > >>timeout
> > > >> >>-
> > > >> >> > >sounds a little bit confusing. Maybe we can do this, let the
> > batch
> > > >> >>sit
> > > >> >> in
> > > >> >> > >accumulator up to linger.ms, then fail it if necessary.
> > > >> >> > >
> > > >> >> > >What do you think?
> > > >> >> > >
> > > >> >> > >Thanks,
> > > >> >> > >
> > > >> >> > >Jiangjie (Becket) Qin
> > > >> >> > >
> > > >> >> > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote:
> > > >> >> > >
> > > >> >> > >>Jiangjie,
> > > >> >> > >>
> > > >> >> > >>Allowing messages to be accumulated in an offline partition
> > > >>could be
> > > >> >> > >>useful
> > > >> >> > >>since the partition may become available before the request
> > > >>timeout
> > > >> >>or
> > > >> >> > >>linger time is reached. Now that we are planning to add a new
> > > >> >>timeout,
> > > >> >> it
> > > >> >> > >>would be useful to think through whether/how that applies to
> > > >> >>messages
> > > >> >> in
> > > >> >> > >>the accumulator too.
> > > >> >> > >>
> > > >> >> > >>Thanks,
> > > >> >> > >>
> > > >> >> > >>Jun
> > > >> >> > >>
> > > >> >> > >>
> > > >> >> > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin
> > > >> >> <j...@linkedin.com.invalid
> > > >> >> > >
> > > >> >> > >>wrote:
> > > >> >> > >>
> > > >> >> > >>> Hi Harsha,
> > > >> >> > >>>
> > > >> >> > >>> Took a quick look at the patch. I think it is still a
> little
> > > >>bit
> > > >> >> > >>> different. KAFKA-1788 only handles the case where a batch
> > > >>sitting
> > > >> >>in
> > > >> >> > >>> accumulator for too long. The KIP is trying to solve the
> > issue
> > > >> >>where
> > > >> >> a
> > > >> >> > >>> batch has already been drained from accumulator and sent to
> > > >> >>broker.
> > > >> >> > >>> We might be able to apply timeout on batch level to merge
> > those
> > > >> >>two
> > > >> >> > >>>cases
> > > >> >> > >>> as Ewen suggested. But I’m not sure if it is a good idea to
> > > >>allow
> > > >> >> > >>>messages
> > > >> >> > >>> whose target partition is offline to sit in accumulator in
> > the
> > > >> >>first
> > > >> >> > >>>place.
> > > >> >> > >>>
> > > >> >> > >>> Jiangjie (Becket) Qin
> > > >> >> > >>>
> > > >> >> > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani"
> > > >><ka...@harsha.io>
> > > >> >> > wrote:
> > > >> >> > >>>
> > > >> >> > >>> >Guozhang and Jiangjie,
> > > >> >> > >>> >                 Isn’t this work being covered in
> > > >> >> > >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can
> you
> > > >> please
> > > >> >> the
> > > >> >> > >>> >review the patch there.
> > > >> >> > >>> >Thanks,
> > > >> >> > >>> >Harsha
> > > >> >> > >>> >
> > > >> >> > >>> >
> > > >> >> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang
> > > >> >>(wangg...@gmail.com
> > > >> >> )
> > > >> >> > >>> >wrote:
> > > >> >> > >>> >
> > > >> >> > >>> >Thanks for the update Jiangjie,
> > > >> >> > >>> >
> > > >> >> > >>> >I think it is actually NOT expected that hardware
> > > >>disconnection
> > > >> >>will
> > > >> >> > >>>be
> > > >> >> > >>> >detected by the selector, but rather will only be revealed
> > > >>upon
> > > >> >>TCP
> > > >> >> > >>> >timeout, which could be hours.
> > > >> >> > >>> >
> > > >> >> > >>> >A couple of comments on the wiki:
> > > >> >> > >>> >
> > > >> >> > >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we
> > > >>need
> > > >> >>the
> > > >> >> > >>> >request
> > > >> >> > >>> >timeout as implict timeout." I am not very clear what does
> > > >>this
> > > >> >> mean?
> > > >> >> > >>> >
> > > >> >> > >>> >2. Currently the producer already has a "TIMEOUT_CONFIG"
> > which
> > > >> >> should
> > > >> >> > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to
> > > >>add "
> > > >> >> > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this
> > renaming:
> > > >> >> > >>>admittedly
> > > >> >> > >>> >
> > > >> >> > >>> >it will change the config names but will reduce confusions
> > > >>moving
> > > >> >> > >>> >forward.
> > > >> >> > >>> >
> > > >> >> > >>> >
> > > >> >> > >>> >Guozhang
> > > >> >> > >>> >
> > > >> >> > >>> >
> > > >> >> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
> > > >> >> > >>><j...@linkedin.com.invalid>
> > > >> >> > >>> >
> > > >> >> > >>> >wrote:
> > > >> >> > >>> >
> > > >> >> > >>> >> Checked the code again. It seems that the disconnected
> > > >>channel
> > > >> >>is
> > > >> >> > >>>not
> > > >> >> > >>> >> detected by selector as expected.
> > > >> >> > >>> >>
> > > >> >> > >>> >> Currently we are depending on the
> > > >> >> > >>> >> o.a.k.common.network.Selector.disconnected set to see if
> > we
> > > >> >>need
> > > >> >> to
> > > >> >> > >>>do
> > > >> >> > >>> >> something for a disconnected channel.
> > > >> >> > >>> >> However Selector.disconnected set is only updated when:
> > > >> >> > >>> >> 1. A write/read/connect to channel failed.
> > > >> >> > >>> >> 2. A Key is canceled
> > > >> >> > >>> >> However when a broker is down before it sends back the
> > > >> >>response,
> > > >> >> the
> > > >> >> > >>> >> client seems not be able to detect this failure.
> > > >> >> > >>> >>
> > > >> >> > >>> >> I did a simple test below:
> > > >> >> > >>> >> 1. Run a selector on one machine and an echo server on
> > > >>another
> > > >> >> > >>>machine.
> > > >> >> > >>> >>
> > > >> >> > >>> >> Connect a selector to an echo server
> > > >> >> > >>> >> 2. Send a message to echo server using selector, then
> let
> > > >>the
> > > >> >> > >>>selector
> > > >> >> > >>> >> poll() every 10 seconds.
> > > >> >> > >>> >> 3. After the sever received the message, unplug cable on
> > the
> > > >> >>echo
> > > >> >> > >>> >>server.
> > > >> >> > >>> >> 4. After waiting for 45 min. The selector still did not
> > > >> >>detected
> > > >> >> the
> > > >> >> > >>> >> network failure.
> > > >> >> > >>> >> Lsof on selector machine shows that the TCP connection
> is
> > > >>still
> > > >> >> > >>> >>considered
> > > >> >> > >>> >> ESTABLISHED.
> > > >> >> > >>> >>
> > > >> >> > >>> >> I’m not sure in this case what should we expect from the
> > > >> >> > >>> >> java.nio.channels.Selector. According to the document,
> the
> > > >> >> selector
> > > >> >> > >>> >>does
> > > >> >> > >>> >> not verify the status of the associated channel. In my
> > test
> > > >> >>case
> > > >> >> it
> > > >> >> > >>> >>looks
> > > >> >> > >>> >> even worse that OS did not think of the socket has been
> > > >> >> > >>>disconnected.
> > > >> >> > >>> >>
> > > >> >> > >>> >> Anyway. It seems adding the client side request timeout
> is
> > > >> >> > >>>necessary.
> > > >> >> > >>> >>I’ve
> > > >> >> > >>> >> updated the KIP page to clarify the problem we want to
> > solve
> > > >> >> > >>>according
> > > >> >> > >>> >>to
> > > >> >> > >>> >> Ewen’s comments.
> > > >> >> > >>> >>
> > > >> >> > >>> >> Thanks.
> > > >> >> > >>> >>
> > > >> >> > >>> >> Jiangjie (Becket) Qin
> > > >> >> > >>> >>
> > > >> >> > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava"
> > > >> >><e...@confluent.io>
> > > >> >> > >>>wrote:
> > > >> >> > >>> >>
> > > >> >> > >>> >>
> > > >> >> > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
> > > >> >> > >>> >><j...@linkedin.com.invalid>
> > > >> >> > >>> >> >wrote:
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >> Hi Ewen, thanks for the comments. Very good points!
> > > >>Please
> > > >> >>see
> > > >> >> > >>> >>replies
> > > >> >> > >>> >> >> inline.
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <
> > > >> >> e...@confluent.io
> > > >> >> > >
> > > >> >> > >>> >> wrote:
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >> >Jiangjie,
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >Great start. I have a couple of comments.
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >Under the motivation section, is it really true that
> > the
> > > >> >> request
> > > >> >> > >>> >>will
> > > >> >> > >>> >> >> >never
> > > >> >> > >>> >> >> >be completed? Presumably if the broker goes down the
> > > >> >> connection
> > > >> >> > >>> >>will be
> > > >> >> > >>> >> >> >severed, at worst by a TCP timeout, which should
> clean
> > > >>up
> > > >> >>the
> > > >> >> > >>> >> >>connection
> > > >> >> > >>> >> >> >and any outstanding requests, right? I think the
> real
> > > >> >>reason
> > > >> >> we
> > > >> >> > >>> >>need a
> > > >> >> > >>> >> >> >different timeout is that the default TCP timeouts
> are
> > > >> >> > >>>ridiculously
> > > >> >> > >>> >>
> > > >> >> > >>> >> >>long
> > > >> >> > >>> >> >> >in
> > > >> >> > >>> >> >> >this context.
> > > >> >> > >>> >> >> Yes, when broker is completely down the request
> should
> > be
> > > >> >> cleared
> > > >> >> > >>>as
> > > >> >> > >>> >>you
> > > >> >> > >>> >> >> said. The case we encountered looks like the broker
> was
> > > >>just
> > > >> >> not
> > > >> >> > >>> >> >> responding but TCP connection was still alive though.
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >Ok, that makes sense.
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >My second question is about whether this is the
> right
> > > >> >>level to
> > > >> >> > >>> >>tackle
> > > >> >> > >>> >> >>the
> > > >> >> > >>> >> >> >issue/what user-facing changes need to be made. A
> > > >>related
> > > >> >> > >>>problem
> > > >> >> > >>> >>came
> > > >> >> > >>> >> >>up
> > > >> >> > >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788
> > > >>where
> > > >> >> > >>>producer
> > > >> >> > >>> >> >> records
> > > >> >> > >>> >> >> >get stuck indefinitely because there's no
> client-side
> > > >> >>timeout.
> > > >> >> > >>>This
> > > >> >> > >>> >>KIP
> > > >> >> > >>> >> >> >wouldn't fix that problem or any problems caused by
> > > >>lack of
> > > >> >> > >>> >> >>connectivity
> > > >> >> > >>> >> >> >since this would only apply to in flight requests,
> > > >>which by
> > > >> >> > >>> >>definition
> > > >> >> > >>> >> >> >must
> > > >> >> > >>> >> >> >have been sent on an active connection.
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >I suspect both types of problems probably need to be
> > > >> >>addressed
> > > >> >> > >>> >> >>separately
> > > >> >> > >>> >> >> >by introducing explicit timeouts. However, because
> the
> > > >> >> settings
> > > >> >> > >>> >> >>introduced
> > > >> >> > >>> >> >> >here are very much about the internal
> implementations
> > of
> > > >> >>the
> > > >> >> > >>> >>clients,
> > > >> >> > >>> >> >>I'm
> > > >> >> > >>> >> >> >wondering if this even needs to be a user-facing
> > > >>setting,
> > > >> >> > >>> >>especially
> > > >> >> > >>> >> >>if we
> > > >> >> > >>> >> >> >have to add other timeouts anyway. For example,
> would
> > a
> > > >> >>fixed,
> > > >> >> > >>> >>generous
> > > >> >> > >>> >> >> >value that's still much shorter than a TCP timeout,
> > say
> > > >> >>15s,
> > > >> >> be
> > > >> >> > >>> >>good
> > > >> >> > >>> >> >> >enough? If other timeouts would allow, for example,
> > the
> > > >> >> clients
> > > >> >> > >>>to
> > > >> >> > >>> >> >> >properly
> > > >> >> > >>> >> >> >exit even if requests have not hit their timeout,
> then
> > > >> >>what's
> > > >> >> > >>>the
> > > >> >> > >>> >> >>benefit
> > > >> >> > >>> >> >> >of being able to configure the request-level
> timeout?
> > > >> >> > >>> >> >> That is a very good point. We have three places that
> we
> > > >> >>might
> > > >> >> be
> > > >> >> > >>> >>able to
> > > >> >> > >>> >> >> enforce timeout for a message send:
> > > >> >> > >>> >> >> 1. Before append to accumulator - handled by metadata
> > > >> >>timeout
> > > >> >> on
> > > >> >> > >>>per
> > > >> >> > >>> >>
> > > >> >> > >>> >> >> message level.
> > > >> >> > >>> >> >> 2. Batch of messages inside accumulator - no timeout
> > > >> >>mechanism
> > > >> >> > >>>now.
> > > >> >> > >>> >> >> 3. Request of batches after messages leave the
> > > >>accumulator
> > > >> >>- we
> > > >> >> > >>>have
> > > >> >> > >>> >>a
> > > >> >> > >>> >> >> broker side timeout but no client side timeout for
> now.
> > > >> >> > >>> >> >> My current proposal only address (3) but not (2).
> > > >> >> > >>> >> >> Honestly I do not have a very clear idea about what
> > > >>should
> > > >> >>we
> > > >> >> do
> > > >> >> > >>> >>with
> > > >> >> > >>> >> >>(2)
> > > >> >> > >>> >> >> right now. But I am with you that we should not
> expose
> > > >>too
> > > >> >>many
> > > >> >> > >>> >> >> configurations to users. What I am thinking now to
> > handle
> > > >> >>(2)
> > > >> >> is
> > > >> >> > >>> >>when
> > > >> >> > >>> >> >>user
> > > >> >> > >>> >> >> call send, if we know that a partition is offline, we
> > > >>should
> > > >> >> > >>>throw
> > > >> >> > >>> >> >> exception immediately instead of putting it into
> > > >> >>accumulator.
> > > >> >> > >>>This
> > > >> >> > >>> >>would
> > > >> >> > >>> >> >> protect further memory consumption. We might also
> want
> > to
> > > >> >>fail
> > > >> >> > >>>all
> > > >> >> > >>> >>the
> > > >> >> > >>> >> >> batches in the dequeue once we found a partition is
> > > >>offline.
> > > >> >> That
> > > >> >> > >>> >> >>said, I
> > > >> >> > >>> >> >> feel timeout might not be quite applicable to (2).
> > > >> >> > >>> >> >> Do you have any suggestion on this?
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >Right, I didn't actually mean to solve 2 here, but was
> > > >>trying
> > > >> >>to
> > > >> >> > >>> >>figure
> > > >> >> > >>> >> >out
> > > >> >> > >>> >> >if a solution to 2 would reduce what we needed to do to
> > > >> >>address
> > > >> >> 3.
> > > >> >> > >>> >>(And
> > > >> >> > >>> >> >depending on how they are implemented, fixing 1 might
> > also
> > > >> >> address
> > > >> >> > >>>2).
> > > >> >> > >>> >>It
> > > >> >> > >>> >> >sounds like you hit hang that I wasn't really
> expecting.
> > > >>This
> > > >> >> > >>>probably
> > > >> >> > >>> >>
> > > >> >> > >>> >> >just
> > > >> >> > >>> >> >means the KIP motivation needs to be a bit clearer
> about
> > > >>what
> > > >> >> type
> > > >> >> > >>>of
> > > >> >> > >>> >> >situation this addresses. The cause of the hang may
> also
> > be
> > > >> >> > >>>relevant
> > > >> >> > >>> >>-- if
> > > >> >> > >>> >> >it was something like a deadlock then that's something
> > that
> > > >> >> should
> > > >> >> > >>> >>just be
> > > >> >> > >>> >> >fixed, but if it's something outside our control then a
> > > >> >>timeout
> > > >> >> > >>>makes
> > > >> >> > >>> >>a
> > > >> >> > >>> >> >lot
> > > >> >> > >>> >> >more sense.
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >I know we have a similar setting,
> > > >> >> > >>> >> >>max.in.flights.requests.per.connection,
> > > >> >> > >>> >> >> >exposed publicly (which I just discovered is missing
> > > >>from
> > > >> >>the
> > > >> >> > >>>new
> > > >> >> > >>> >> >>producer
> > > >> >> > >>> >> >> >configs documentation). But it looks like the new
> > > >>consumer
> > > >> >>is
> > > >> >> > >>>not
> > > >> >> > >>> >> >>exposing
> > > >> >> > >>> >> >> >that option, using a fixed value instead. I think we
> > > >>should
> > > >> >> > >>>default
> > > >> >> > >>> >>to
> > > >> >> > >>> >> >> >hiding these implementation values unless there's a
> > > >>strong
> > > >> >> case
> > > >> >> > >>>for
> > > >> >> > >>> >>a
> > > >> >> > >>> >> >> >scenario that requires customization.
> > > >> >> > >>> >> >> For producer, max.in.flight.requests.per.connection
> > > >>really
> > > >> >> > >>>matters.
> > > >> >> > >>> >>If
> > > >> >> > >>> >> >> people do not want to have reorder of messages, they
> > > >>have to
> > > >> >> use
> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=1. On the other
> > > >>hand,
> > > >> >>if
> > > >> >> > >>> >> >>throughput
> > > >> >> > >>> >> >> is more of a concern, it could be set to higher. For
> > the
> > > >>new
> > > >> >> > >>> >>consumer, I
> > > >> >> > >>> >> >> checked the value and I am not sure if the hard coded
> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=100 is the
> right
> > > >> >>value.
> > > >> >> > >>> >>Without
> > > >> >> > >>> >> >>the
> > > >> >> > >>> >> >> response to the previous request, what offsets should
> > be
> > > >>put
> > > >> >> into
> > > >> >> > >>> >>the
> > > >> >> > >>> >> >>next
> > > >> >> > >>> >> >> fetch request? It seems to me the value will be one
> > > >>natively
> > > >> >> > >>> >>regardless
> > > >> >> > >>> >> >>of
> > > >> >> > >>> >> >> the setting unless we are sending fetch request to
> > > >>different
> > > >> >> > >>> >>partitions,
> > > >> >> > >>> >> >> which does not look like the case.
> > > >> >> > >>> >> >> Anyway, it looks to be a separate issue orthogonal to
> > the
> > > >> >> request
> > > >> >> > >>> >> >>timeout.
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >> >In other words, since the only user-facing change
> was
> > > >>the
> > > >> >> > >>>addition
> > > >> >> > >>> >>of
> > > >> >> > >>> >> >>the
> > > >> >> > >>> >> >> >setting, I'm wondering if we can avoid the KIP
> > > >>altogether
> > > >> >>by
> > > >> >> > >>>just
> > > >> >> > >>> >> >>choosing
> > > >> >> > >>> >> >> >a good default value for the timeout.
> > > >> >> > >>> >> >> The problem is that we have a server side request
> > timeout
> > > >> >> exposed
> > > >> >> > >>>as
> > > >> >> > >>> >>a
> > > >> >> > >>> >> >> public configuration. We cannot set the client
> timeout
> > > >> >>smaller
> > > >> >> > >>>than
> > > >> >> > >>> >>that
> > > >> >> > >>> >> >> value, so a hard coded value probably won¹t work
> here.
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >That makes sense, although it's worth keeping in mind
> > that
> > > >> >>even
> > > >> >> if
> > > >> >> > >>>you
> > > >> >> > >>> >>use
> > > >> >> > >>> >> >"correct" values, they could still be violated due to,
> > > >>e.g.,
> > > >> >>a GC
> > > >> >> > >>> >>pause
> > > >> >> > >>> >> >that causes the broker to process a request after it is
> > > >> >>supposed
> > > >> >> to
> > > >> >> > >>> >>have
> > > >> >> > >>> >> >expired.
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >-Ewen
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >-Ewen
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
> > > >> >> > >>> >> >><j...@linkedin.com.invalid>
> > > >> >> > >>> >> >> >wrote:
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >> Hi,
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >> I just created a KIP to add a request timeout to
> > > >> >> NetworkClient
> > > >> >> > >>> >>for
> > > >> >> > >>> >> >>new
> > > >> >> > >>> >> >> >> Kafka clients.
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >>
> > > >> >> > >>> >>
> > > >> >> > >>> >>
> > > >> >> > >>>
> > > >> >> > >>>
> > > >> >> >
> > > >>
> > > >>>>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+reques
> > > >>>>t
> > > >> >> > >>>+
> > > >> >> > >>> >>
> > > >> >> > >>> >> >> >>timeout+to+NetworkClient
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >> Comments and suggestions are welcome!
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >> Thanks.
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >> Jiangjie (Becket) Qin
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >>
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >
> > > >> >> > >>> >> >> >--
> > > >> >> > >>> >> >> >Thanks,
> > > >> >> > >>> >> >> >Ewen
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >>
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >
> > > >> >> > >>> >> >--
> > > >> >> > >>> >> >Thanks,
> > > >> >> > >>> >> >Ewen
> > > >> >> > >>> >>
> > > >> >> > >>> >>
> > > >> >> > >>> >
> > > >> >> > >>> >
> > > >> >> > >>> >--
> > > >> >> > >>> >-- Guozhang
> > > >> >> > >>>
> > > >> >> > >>>
> > > >> >> > >
> > > >> >> >
> > > >> >> >
> > > >> >>
> > > >> >
> > > >> >
> > > >> >
> > > >> >--
> > > >> >Thanks,
> > > >> >Ewen
> > > >>
> > > >>
> > > >
> > > >
> > > >--
> > > >-Regards,
> > > >Mayuresh R. Gharat
> > > >(862) 250-7125
> > >
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>

Reply via email to