I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down.
I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: >Jun, > >I thought a little bit differently on this. >Intuitively, I am thinking that if a partition is offline, the metadata >for that partition should be considered not ready because we don’t know >which broker we should send the message to. So those sends need to be >blocked on metadata timeout. >Another thing I’m wondering is in which scenario an offline partition will >become online again in a short period of time and how likely it will >occur. My understanding is that the batch timeout for batches sitting in >accumulator should be larger than linger.ms but should not be too long >(e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer >with batches to be aborted. > >That said, I do agree it is reasonable to buffer the message for some time >so messages to other partitions can still get sent. But adding another >expiration in addition to linger.ms - which is essentially a timeout - >sounds a little bit confusing. Maybe we can do this, let the batch sit in >accumulator up to linger.ms, then fail it if necessary. > >What do you think? > >Thanks, > >Jiangjie (Becket) Qin > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote: > >>Jiangjie, >> >>Allowing messages to be accumulated in an offline partition could be >>useful >>since the partition may become available before the request timeout or >>linger time is reached. Now that we are planning to add a new timeout, it >>would be useful to think through whether/how that applies to messages in >>the accumulator too. >> >>Thanks, >> >>Jun >> >> >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin <j...@linkedin.com.invalid> >>wrote: >> >>> Hi Harsha, >>> >>> Took a quick look at the patch. I think it is still a little bit >>> different. KAFKA-1788 only handles the case where a batch sitting in >>> accumulator for too long. The KIP is trying to solve the issue where a >>> batch has already been drained from accumulator and sent to broker. >>> We might be able to apply timeout on batch level to merge those two >>>cases >>> as Ewen suggested. But I’m not sure if it is a good idea to allow >>>messages >>> whose target partition is offline to sit in accumulator in the first >>>place. >>> >>> Jiangjie (Becket) Qin >>> >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" <ka...@harsha.io> wrote: >>> >>> >Guozhang and Jiangjie, >>> > Isn’t this work being covered in >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the >>> >review the patch there. >>> >Thanks, >>> >Harsha >>> > >>> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) >>> >wrote: >>> > >>> >Thanks for the update Jiangjie, >>> > >>> >I think it is actually NOT expected that hardware disconnection will >>>be >>> >detected by the selector, but rather will only be revealed upon TCP >>> >timeout, which could be hours. >>> > >>> >A couple of comments on the wiki: >>> > >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need the >>> >request >>> >timeout as implict timeout." I am not very clear what does this mean? >>> > >>> >2. Currently the producer already has a "TIMEOUT_CONFIG" which should >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add " >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: >>>admittedly >>> > >>> >it will change the config names but will reduce confusions moving >>> >forward. >>> > >>> > >>> >Guozhang >>> > >>> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin >>><j...@linkedin.com.invalid> >>> > >>> >wrote: >>> > >>> >> Checked the code again. It seems that the disconnected channel is >>>not >>> >> detected by selector as expected. >>> >> >>> >> Currently we are depending on the >>> >> o.a.k.common.network.Selector.disconnected set to see if we need to >>>do >>> >> something for a disconnected channel. >>> >> However Selector.disconnected set is only updated when: >>> >> 1. A write/read/connect to channel failed. >>> >> 2. A Key is canceled >>> >> However when a broker is down before it sends back the response, the >>> >> client seems not be able to detect this failure. >>> >> >>> >> I did a simple test below: >>> >> 1. Run a selector on one machine and an echo server on another >>>machine. >>> >> >>> >> Connect a selector to an echo server >>> >> 2. Send a message to echo server using selector, then let the >>>selector >>> >> poll() every 10 seconds. >>> >> 3. After the sever received the message, unplug cable on the echo >>> >>server. >>> >> 4. After waiting for 45 min. The selector still did not detected the >>> >> network failure. >>> >> Lsof on selector machine shows that the TCP connection is still >>> >>considered >>> >> ESTABLISHED. >>> >> >>> >> I’m not sure in this case what should we expect from the >>> >> java.nio.channels.Selector. According to the document, the selector >>> >>does >>> >> not verify the status of the associated channel. In my test case it >>> >>looks >>> >> even worse that OS did not think of the socket has been >>>disconnected. >>> >> >>> >> Anyway. It seems adding the client side request timeout is >>>necessary. >>> >>I’ve >>> >> updated the KIP page to clarify the problem we want to solve >>>according >>> >>to >>> >> Ewen’s comments. >>> >> >>> >> Thanks. >>> >> >>> >> Jiangjie (Becket) Qin >>> >> >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> >>>wrote: >>> >> >>> >> >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin >>> >><j...@linkedin.com.invalid> >>> >> >wrote: >>> >> > >>> >> >> Hi Ewen, thanks for the comments. Very good points! Please see >>> >>replies >>> >> >> inline. >>> >> >> >>> >> >> >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io> >>> >> wrote: >>> >> >> >>> >> >> >Jiangjie, >>> >> >> > >>> >> >> >Great start. I have a couple of comments. >>> >> >> > >>> >> >> >Under the motivation section, is it really true that the request >>> >>will >>> >> >> >never >>> >> >> >be completed? Presumably if the broker goes down the connection >>> >>will be >>> >> >> >severed, at worst by a TCP timeout, which should clean up the >>> >> >>connection >>> >> >> >and any outstanding requests, right? I think the real reason we >>> >>need a >>> >> >> >different timeout is that the default TCP timeouts are >>>ridiculously >>> >> >>> >> >>long >>> >> >> >in >>> >> >> >this context. >>> >> >> Yes, when broker is completely down the request should be cleared >>>as >>> >>you >>> >> >> said. The case we encountered looks like the broker was just not >>> >> >> responding but TCP connection was still alive though. >>> >> >> >>> >> > >>> >> >Ok, that makes sense. >>> >> > >>> >> > >>> >> >> >>> >> >> > >>> >> >> >My second question is about whether this is the right level to >>> >>tackle >>> >> >>the >>> >> >> >issue/what user-facing changes need to be made. A related >>>problem >>> >>came >>> >> >>up >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where >>>producer >>> >> >> records >>> >> >> >get stuck indefinitely because there's no client-side timeout. >>>This >>> >>KIP >>> >> >> >wouldn't fix that problem or any problems caused by lack of >>> >> >>connectivity >>> >> >> >since this would only apply to in flight requests, which by >>> >>definition >>> >> >> >must >>> >> >> >have been sent on an active connection. >>> >> >> > >>> >> >> >I suspect both types of problems probably need to be addressed >>> >> >>separately >>> >> >> >by introducing explicit timeouts. However, because the settings >>> >> >>introduced >>> >> >> >here are very much about the internal implementations of the >>> >>clients, >>> >> >>I'm >>> >> >> >wondering if this even needs to be a user-facing setting, >>> >>especially >>> >> >>if we >>> >> >> >have to add other timeouts anyway. For example, would a fixed, >>> >>generous >>> >> >> >value that's still much shorter than a TCP timeout, say 15s, be >>> >>good >>> >> >> >enough? If other timeouts would allow, for example, the clients >>>to >>> >> >> >properly >>> >> >> >exit even if requests have not hit their timeout, then what's >>>the >>> >> >>benefit >>> >> >> >of being able to configure the request-level timeout? >>> >> >> That is a very good point. We have three places that we might be >>> >>able to >>> >> >> enforce timeout for a message send: >>> >> >> 1. Before append to accumulator - handled by metadata timeout on >>>per >>> >> >>> >> >> message level. >>> >> >> 2. Batch of messages inside accumulator - no timeout mechanism >>>now. >>> >> >> 3. Request of batches after messages leave the accumulator - we >>>have >>> >>a >>> >> >> broker side timeout but no client side timeout for now. >>> >> >> My current proposal only address (3) but not (2). >>> >> >> Honestly I do not have a very clear idea about what should we do >>> >>with >>> >> >>(2) >>> >> >> right now. But I am with you that we should not expose too many >>> >> >> configurations to users. What I am thinking now to handle (2) is >>> >>when >>> >> >>user >>> >> >> call send, if we know that a partition is offline, we should >>>throw >>> >> >> exception immediately instead of putting it into accumulator. >>>This >>> >>would >>> >> >> protect further memory consumption. We might also want to fail >>>all >>> >>the >>> >> >> batches in the dequeue once we found a partition is offline. That >>> >> >>said, I >>> >> >> feel timeout might not be quite applicable to (2). >>> >> >> Do you have any suggestion on this? >>> >> >> >>> >> > >>> >> >Right, I didn't actually mean to solve 2 here, but was trying to >>> >>figure >>> >> >out >>> >> >if a solution to 2 would reduce what we needed to do to address 3. >>> >>(And >>> >> >depending on how they are implemented, fixing 1 might also address >>>2). >>> >>It >>> >> >sounds like you hit hang that I wasn't really expecting. This >>>probably >>> >> >>> >> >just >>> >> >means the KIP motivation needs to be a bit clearer about what type >>>of >>> >> >situation this addresses. The cause of the hang may also be >>>relevant >>> >>-- if >>> >> >it was something like a deadlock then that's something that should >>> >>just be >>> >> >fixed, but if it's something outside our control then a timeout >>>makes >>> >>a >>> >> >lot >>> >> >more sense. >>> >> > >>> >> > >>> >> >> > >>> >> >> >I know we have a similar setting, >>> >> >>max.in.flights.requests.per.connection, >>> >> >> >exposed publicly (which I just discovered is missing from the >>>new >>> >> >>producer >>> >> >> >configs documentation). But it looks like the new consumer is >>>not >>> >> >>exposing >>> >> >> >that option, using a fixed value instead. I think we should >>>default >>> >>to >>> >> >> >hiding these implementation values unless there's a strong case >>>for >>> >>a >>> >> >> >scenario that requires customization. >>> >> >> For producer, max.in.flight.requests.per.connection really >>>matters. >>> >>If >>> >> >> people do not want to have reorder of messages, they have to use >>> >> >> max.in.flight.requests.per.connection=1. On the other hand, if >>> >> >>throughput >>> >> >> is more of a concern, it could be set to higher. For the new >>> >>consumer, I >>> >> >> checked the value and I am not sure if the hard coded >>> >> >> max.in.flight.requests.per.connection=100 is the right value. >>> >>Without >>> >> >>the >>> >> >> response to the previous request, what offsets should be put into >>> >>the >>> >> >>next >>> >> >> fetch request? It seems to me the value will be one natively >>> >>regardless >>> >> >>of >>> >> >> the setting unless we are sending fetch request to different >>> >>partitions, >>> >> >> which does not look like the case. >>> >> >> Anyway, it looks to be a separate issue orthogonal to the request >>> >> >>timeout. >>> >> >> >>> >> > >>> >> > >>> >> >> >>> >> >> >In other words, since the only user-facing change was the >>>addition >>> >>of >>> >> >>the >>> >> >> >setting, I'm wondering if we can avoid the KIP altogether by >>>just >>> >> >>choosing >>> >> >> >a good default value for the timeout. >>> >> >> The problem is that we have a server side request timeout exposed >>>as >>> >>a >>> >> >> public configuration. We cannot set the client timeout smaller >>>than >>> >>that >>> >> >> value, so a hard coded value probably won¹t work here. >>> >> >> >>> >> > >>> >> >That makes sense, although it's worth keeping in mind that even if >>>you >>> >>use >>> >> >"correct" values, they could still be violated due to, e.g., a GC >>> >>pause >>> >> >that causes the broker to process a request after it is supposed to >>> >>have >>> >> >expired. >>> >> > >>> >> >-Ewen >>> >> > >>> >> > >>> >> > >>> >> >> > >>> >> >> >-Ewen >>> >> >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin >>> >> >><j...@linkedin.com.invalid> >>> >> >> >wrote: >>> >> >> > >>> >> >> >> Hi, >>> >> >> >> >>> >> >> >> I just created a KIP to add a request timeout to NetworkClient >>> >>for >>> >> >>new >>> >> >> >> Kafka clients. >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >>> >> >> >>> >> >>> >> >>> >>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request >>>+ >>> >> >>> >> >> >>timeout+to+NetworkClient >>> >> >> >> >>> >> >> >> Comments and suggestions are welcome! >>> >> >> >> >>> >> >> >> Thanks. >>> >> >> >> >>> >> >> >> Jiangjie (Becket) Qin >>> >> >> >> >>> >> >> >> >>> >> >> > >>> >> >> > >>> >> >> >-- >>> >> >> >Thanks, >>> >> >> >Ewen >>> >> >> >>> >> >> >>> >> > >>> >> > >>> >> >-- >>> >> >Thanks, >>> >> >Ewen >>> >> >>> >> >>> > >>> > >>> >-- >>> >-- Guozhang >>> >>> >