Yes, that is the plan.

On 5/5/15, 8:23 PM, "Mayuresh Gharat" <gharatmayures...@gmail.com> wrote:

>Just a quick question, can we handle REQUEST TIMEOUT as disconnections and
>do a fresh MetaDataRequest and retry instead of failing the request?
>
>
>Thanks,
>
>Mayuresh
>
>
>On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin <j...@linkedin.com.invalid>
>wrote:
>
>> I incorporated Ewen and Guozhang’s comments in the KIP page. Want to
>>speed
>> up on this KIP because currently we experience mirror-maker hung very
>> likely when a broker is down.
>>
>> I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
>> timeout to expire the batches which are sitting in accumulator without
>> leader info. I did that because the situation there is essentially
>>missing
>> metadata.
>>
>> As a summary of what I am thinking about the timeout in new Producer:
>>
>> 1. Metadata timeout:
>>   - used in send(), blocking
>>   - used in accumulator to expire batches with timeout exception.
>> 2. Linger.ms
>>   - Used in accumulator to ready the batch for drain
>> 3. Request timeout
>>   - Used in NetworkClient to expire a batch and retry if no response is
>> received for a request before timeout.
>>
>> So in this KIP, we only address (3). The only public interface change
>>is a
>> new configuration of request timeout (and maybe change the configuration
>> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
>>
>> Would like to see what people think of above approach?
>>
>> Jiangjie (Becket) Qin
>>
>> On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote:
>>
>> >Jun,
>> >
>> >I thought a little bit differently on this.
>> >Intuitively, I am thinking that if a partition is offline, the metadata
>> >for that partition should be considered not ready because we don’t know
>> >which broker we should send the message to. So those sends need to be
>> >blocked on metadata timeout.
>> >Another thing I’m wondering is in which scenario an offline partition
>>will
>> >become online again in a short period of time and how likely it will
>> >occur. My understanding is that the batch timeout for batches sitting
>>in
>> >accumulator should be larger than linger.ms but should not be too long
>> >(e.g. less than 60 seconds). Otherwise it will exhaust the shared
>>buffer
>> >with batches to be aborted.
>> >
>> >That said, I do agree it is reasonable to buffer the message for some
>>time
>> >so messages to other partitions can still get sent. But adding another
>> >expiration in addition to linger.ms - which is essentially a timeout -
>> >sounds a little bit confusing. Maybe we can do this, let the batch sit
>>in
>> >accumulator up to linger.ms, then fail it if necessary.
>> >
>> >What do you think?
>> >
>> >Thanks,
>> >
>> >Jiangjie (Becket) Qin
>> >
>> >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote:
>> >
>> >>Jiangjie,
>> >>
>> >>Allowing messages to be accumulated in an offline partition could be
>> >>useful
>> >>since the partition may become available before the request timeout or
>> >>linger time is reached. Now that we are planning to add a new
>>timeout, it
>> >>would be useful to think through whether/how that applies to messages
>>in
>> >>the accumulator too.
>> >>
>> >>Thanks,
>> >>
>> >>Jun
>> >>
>> >>
>> >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin
>><j...@linkedin.com.invalid
>> >
>> >>wrote:
>> >>
>> >>> Hi Harsha,
>> >>>
>> >>> Took a quick look at the patch. I think it is still a little bit
>> >>> different. KAFKA-1788 only handles the case where a batch sitting in
>> >>> accumulator for too long. The KIP is trying to solve the issue
>>where a
>> >>> batch has already been drained from accumulator and sent to broker.
>> >>> We might be able to apply timeout on batch level to merge those two
>> >>>cases
>> >>> as Ewen suggested. But I’m not sure if it is a good idea to allow
>> >>>messages
>> >>> whose target partition is offline to sit in accumulator in the first
>> >>>place.
>> >>>
>> >>> Jiangjie (Becket) Qin
>> >>>
>> >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" <ka...@harsha.io>
>> wrote:
>> >>>
>> >>> >Guozhang and Jiangjie,
>> >>> >                 Isn’t this work being covered in
>> >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please
>>the
>> >>> >review the patch there.
>> >>> >Thanks,
>> >>> >Harsha
>> >>> >
>> >>> >
>> >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang
>>(wangg...@gmail.com)
>> >>> >wrote:
>> >>> >
>> >>> >Thanks for the update Jiangjie,
>> >>> >
>> >>> >I think it is actually NOT expected that hardware disconnection
>>will
>> >>>be
>> >>> >detected by the selector, but rather will only be revealed upon TCP
>> >>> >timeout, which could be hours.
>> >>> >
>> >>> >A couple of comments on the wiki:
>> >>> >
>> >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need the
>> >>> >request
>> >>> >timeout as implict timeout." I am not very clear what does this
>>mean?
>> >>> >
>> >>> >2. Currently the producer already has a "TIMEOUT_CONFIG" which
>>should
>> >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add "
>> >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming:
>> >>>admittedly
>> >>> >
>> >>> >it will change the config names but will reduce confusions moving
>> >>> >forward.
>> >>> >
>> >>> >
>> >>> >Guozhang
>> >>> >
>> >>> >
>> >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
>> >>><j...@linkedin.com.invalid>
>> >>> >
>> >>> >wrote:
>> >>> >
>> >>> >> Checked the code again. It seems that the disconnected channel is
>> >>>not
>> >>> >> detected by selector as expected.
>> >>> >>
>> >>> >> Currently we are depending on the
>> >>> >> o.a.k.common.network.Selector.disconnected set to see if we need
>>to
>> >>>do
>> >>> >> something for a disconnected channel.
>> >>> >> However Selector.disconnected set is only updated when:
>> >>> >> 1. A write/read/connect to channel failed.
>> >>> >> 2. A Key is canceled
>> >>> >> However when a broker is down before it sends back the response,
>>the
>> >>> >> client seems not be able to detect this failure.
>> >>> >>
>> >>> >> I did a simple test below:
>> >>> >> 1. Run a selector on one machine and an echo server on another
>> >>>machine.
>> >>> >>
>> >>> >> Connect a selector to an echo server
>> >>> >> 2. Send a message to echo server using selector, then let the
>> >>>selector
>> >>> >> poll() every 10 seconds.
>> >>> >> 3. After the sever received the message, unplug cable on the echo
>> >>> >>server.
>> >>> >> 4. After waiting for 45 min. The selector still did not detected
>>the
>> >>> >> network failure.
>> >>> >> Lsof on selector machine shows that the TCP connection is still
>> >>> >>considered
>> >>> >> ESTABLISHED.
>> >>> >>
>> >>> >> I’m not sure in this case what should we expect from the
>> >>> >> java.nio.channels.Selector. According to the document, the
>>selector
>> >>> >>does
>> >>> >> not verify the status of the associated channel. In my test case
>>it
>> >>> >>looks
>> >>> >> even worse that OS did not think of the socket has been
>> >>>disconnected.
>> >>> >>
>> >>> >> Anyway. It seems adding the client side request timeout is
>> >>>necessary.
>> >>> >>I’ve
>> >>> >> updated the KIP page to clarify the problem we want to solve
>> >>>according
>> >>> >>to
>> >>> >> Ewen’s comments.
>> >>> >>
>> >>> >> Thanks.
>> >>> >>
>> >>> >> Jiangjie (Becket) Qin
>> >>> >>
>> >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io>
>> >>>wrote:
>> >>> >>
>> >>> >>
>> >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
>> >>> >><j...@linkedin.com.invalid>
>> >>> >> >wrote:
>> >>> >> >
>> >>> >> >> Hi Ewen, thanks for the comments. Very good points! Please see
>> >>> >>replies
>> >>> >> >> inline.
>> >>> >> >>
>> >>> >> >>
>> >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava"
>><e...@confluent.io
>> >
>> >>> >> wrote:
>> >>> >> >>
>> >>> >> >> >Jiangjie,
>> >>> >> >> >
>> >>> >> >> >Great start. I have a couple of comments.
>> >>> >> >> >
>> >>> >> >> >Under the motivation section, is it really true that the
>>request
>> >>> >>will
>> >>> >> >> >never
>> >>> >> >> >be completed? Presumably if the broker goes down the
>>connection
>> >>> >>will be
>> >>> >> >> >severed, at worst by a TCP timeout, which should clean up the
>> >>> >> >>connection
>> >>> >> >> >and any outstanding requests, right? I think the real reason
>>we
>> >>> >>need a
>> >>> >> >> >different timeout is that the default TCP timeouts are
>> >>>ridiculously
>> >>> >>
>> >>> >> >>long
>> >>> >> >> >in
>> >>> >> >> >this context.
>> >>> >> >> Yes, when broker is completely down the request should be
>>cleared
>> >>>as
>> >>> >>you
>> >>> >> >> said. The case we encountered looks like the broker was just
>>not
>> >>> >> >> responding but TCP connection was still alive though.
>> >>> >> >>
>> >>> >> >
>> >>> >> >Ok, that makes sense.
>> >>> >> >
>> >>> >> >
>> >>> >> >>
>> >>> >> >> >
>> >>> >> >> >My second question is about whether this is the right level
>>to
>> >>> >>tackle
>> >>> >> >>the
>> >>> >> >> >issue/what user-facing changes need to be made. A related
>> >>>problem
>> >>> >>came
>> >>> >> >>up
>> >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where
>> >>>producer
>> >>> >> >> records
>> >>> >> >> >get stuck indefinitely because there's no client-side
>>timeout.
>> >>>This
>> >>> >>KIP
>> >>> >> >> >wouldn't fix that problem or any problems caused by lack of
>> >>> >> >>connectivity
>> >>> >> >> >since this would only apply to in flight requests, which by
>> >>> >>definition
>> >>> >> >> >must
>> >>> >> >> >have been sent on an active connection.
>> >>> >> >> >
>> >>> >> >> >I suspect both types of problems probably need to be
>>addressed
>> >>> >> >>separately
>> >>> >> >> >by introducing explicit timeouts. However, because the
>>settings
>> >>> >> >>introduced
>> >>> >> >> >here are very much about the internal implementations of the
>> >>> >>clients,
>> >>> >> >>I'm
>> >>> >> >> >wondering if this even needs to be a user-facing setting,
>> >>> >>especially
>> >>> >> >>if we
>> >>> >> >> >have to add other timeouts anyway. For example, would a
>>fixed,
>> >>> >>generous
>> >>> >> >> >value that's still much shorter than a TCP timeout, say 15s,
>>be
>> >>> >>good
>> >>> >> >> >enough? If other timeouts would allow, for example, the
>>clients
>> >>>to
>> >>> >> >> >properly
>> >>> >> >> >exit even if requests have not hit their timeout, then what's
>> >>>the
>> >>> >> >>benefit
>> >>> >> >> >of being able to configure the request-level timeout?
>> >>> >> >> That is a very good point. We have three places that we might
>>be
>> >>> >>able to
>> >>> >> >> enforce timeout for a message send:
>> >>> >> >> 1. Before append to accumulator - handled by metadata timeout
>>on
>> >>>per
>> >>> >>
>> >>> >> >> message level.
>> >>> >> >> 2. Batch of messages inside accumulator - no timeout mechanism
>> >>>now.
>> >>> >> >> 3. Request of batches after messages leave the accumulator -
>>we
>> >>>have
>> >>> >>a
>> >>> >> >> broker side timeout but no client side timeout for now.
>> >>> >> >> My current proposal only address (3) but not (2).
>> >>> >> >> Honestly I do not have a very clear idea about what should we
>>do
>> >>> >>with
>> >>> >> >>(2)
>> >>> >> >> right now. But I am with you that we should not expose too
>>many
>> >>> >> >> configurations to users. What I am thinking now to handle (2)
>>is
>> >>> >>when
>> >>> >> >>user
>> >>> >> >> call send, if we know that a partition is offline, we should
>> >>>throw
>> >>> >> >> exception immediately instead of putting it into accumulator.
>> >>>This
>> >>> >>would
>> >>> >> >> protect further memory consumption. We might also want to fail
>> >>>all
>> >>> >>the
>> >>> >> >> batches in the dequeue once we found a partition is offline.
>>That
>> >>> >> >>said, I
>> >>> >> >> feel timeout might not be quite applicable to (2).
>> >>> >> >> Do you have any suggestion on this?
>> >>> >> >>
>> >>> >> >
>> >>> >> >Right, I didn't actually mean to solve 2 here, but was trying to
>> >>> >>figure
>> >>> >> >out
>> >>> >> >if a solution to 2 would reduce what we needed to do to address
>>3.
>> >>> >>(And
>> >>> >> >depending on how they are implemented, fixing 1 might also
>>address
>> >>>2).
>> >>> >>It
>> >>> >> >sounds like you hit hang that I wasn't really expecting. This
>> >>>probably
>> >>> >>
>> >>> >> >just
>> >>> >> >means the KIP motivation needs to be a bit clearer about what
>>type
>> >>>of
>> >>> >> >situation this addresses. The cause of the hang may also be
>> >>>relevant
>> >>> >>-- if
>> >>> >> >it was something like a deadlock then that's something that
>>should
>> >>> >>just be
>> >>> >> >fixed, but if it's something outside our control then a timeout
>> >>>makes
>> >>> >>a
>> >>> >> >lot
>> >>> >> >more sense.
>> >>> >> >
>> >>> >> >
>> >>> >> >> >
>> >>> >> >> >I know we have a similar setting,
>> >>> >> >>max.in.flights.requests.per.connection,
>> >>> >> >> >exposed publicly (which I just discovered is missing from the
>> >>>new
>> >>> >> >>producer
>> >>> >> >> >configs documentation). But it looks like the new consumer is
>> >>>not
>> >>> >> >>exposing
>> >>> >> >> >that option, using a fixed value instead. I think we should
>> >>>default
>> >>> >>to
>> >>> >> >> >hiding these implementation values unless there's a strong
>>case
>> >>>for
>> >>> >>a
>> >>> >> >> >scenario that requires customization.
>> >>> >> >> For producer, max.in.flight.requests.per.connection really
>> >>>matters.
>> >>> >>If
>> >>> >> >> people do not want to have reorder of messages, they have to
>>use
>> >>> >> >> max.in.flight.requests.per.connection=1. On the other hand, if
>> >>> >> >>throughput
>> >>> >> >> is more of a concern, it could be set to higher. For the new
>> >>> >>consumer, I
>> >>> >> >> checked the value and I am not sure if the hard coded
>> >>> >> >> max.in.flight.requests.per.connection=100 is the right value.
>> >>> >>Without
>> >>> >> >>the
>> >>> >> >> response to the previous request, what offsets should be put
>>into
>> >>> >>the
>> >>> >> >>next
>> >>> >> >> fetch request? It seems to me the value will be one natively
>> >>> >>regardless
>> >>> >> >>of
>> >>> >> >> the setting unless we are sending fetch request to different
>> >>> >>partitions,
>> >>> >> >> which does not look like the case.
>> >>> >> >> Anyway, it looks to be a separate issue orthogonal to the
>>request
>> >>> >> >>timeout.
>> >>> >> >>
>> >>> >> >
>> >>> >> >
>> >>> >> >>
>> >>> >> >> >In other words, since the only user-facing change was the
>> >>>addition
>> >>> >>of
>> >>> >> >>the
>> >>> >> >> >setting, I'm wondering if we can avoid the KIP altogether by
>> >>>just
>> >>> >> >>choosing
>> >>> >> >> >a good default value for the timeout.
>> >>> >> >> The problem is that we have a server side request timeout
>>exposed
>> >>>as
>> >>> >>a
>> >>> >> >> public configuration. We cannot set the client timeout smaller
>> >>>than
>> >>> >>that
>> >>> >> >> value, so a hard coded value probably won¹t work here.
>> >>> >> >>
>> >>> >> >
>> >>> >> >That makes sense, although it's worth keeping in mind that even
>>if
>> >>>you
>> >>> >>use
>> >>> >> >"correct" values, they could still be violated due to, e.g., a
>>GC
>> >>> >>pause
>> >>> >> >that causes the broker to process a request after it is
>>supposed to
>> >>> >>have
>> >>> >> >expired.
>> >>> >> >
>> >>> >> >-Ewen
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >> >
>> >>> >> >> >-Ewen
>> >>> >> >> >
>> >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
>> >>> >> >><j...@linkedin.com.invalid>
>> >>> >> >> >wrote:
>> >>> >> >> >
>> >>> >> >> >> Hi,
>> >>> >> >> >>
>> >>> >> >> >> I just created a KIP to add a request timeout to
>>NetworkClient
>> >>> >>for
>> >>> >> >>new
>> >>> >> >> >> Kafka clients.
>> >>> >> >> >>
>> >>> >> >> >>
>> >>> >> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >>
>> >>> >>
>> >>>
>> >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request
>> >>>+
>> >>> >>
>> >>> >> >> >>timeout+to+NetworkClient
>> >>> >> >> >>
>> >>> >> >> >> Comments and suggestions are welcome!
>> >>> >> >> >>
>> >>> >> >> >> Thanks.
>> >>> >> >> >>
>> >>> >> >> >> Jiangjie (Becket) Qin
>> >>> >> >> >>
>> >>> >> >> >>
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> >--
>> >>> >> >> >Thanks,
>> >>> >> >> >Ewen
>> >>> >> >>
>> >>> >> >>
>> >>> >> >
>> >>> >> >
>> >>> >> >--
>> >>> >> >Thanks,
>> >>> >> >Ewen
>> >>> >>
>> >>> >>
>> >>> >
>> >>> >
>> >>> >--
>> >>> >-- Guozhang
>> >>>
>> >>>
>> >
>>
>>
>
>
>-- 
>-Regards,
>Mayuresh R. Gharat
>(862) 250-7125

Reply via email to