Yes, that is the plan. On 5/5/15, 8:23 PM, "Mayuresh Gharat" <gharatmayures...@gmail.com> wrote:
>Just a quick question, can we handle REQUEST TIMEOUT as disconnections and >do a fresh MetaDataRequest and retry instead of failing the request? > > >Thanks, > >Mayuresh > > >On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> I incorporated Ewen and Guozhang’s comments in the KIP page. Want to >>speed >> up on this KIP because currently we experience mirror-maker hung very >> likely when a broker is down. >> >> I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata >> timeout to expire the batches which are sitting in accumulator without >> leader info. I did that because the situation there is essentially >>missing >> metadata. >> >> As a summary of what I am thinking about the timeout in new Producer: >> >> 1. Metadata timeout: >> - used in send(), blocking >> - used in accumulator to expire batches with timeout exception. >> 2. Linger.ms >> - Used in accumulator to ready the batch for drain >> 3. Request timeout >> - Used in NetworkClient to expire a batch and retry if no response is >> received for a request before timeout. >> >> So in this KIP, we only address (3). The only public interface change >>is a >> new configuration of request timeout (and maybe change the configuration >> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). >> >> Would like to see what people think of above approach? >> >> Jiangjie (Becket) Qin >> >> On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: >> >> >Jun, >> > >> >I thought a little bit differently on this. >> >Intuitively, I am thinking that if a partition is offline, the metadata >> >for that partition should be considered not ready because we don’t know >> >which broker we should send the message to. So those sends need to be >> >blocked on metadata timeout. >> >Another thing I’m wondering is in which scenario an offline partition >>will >> >become online again in a short period of time and how likely it will >> >occur. My understanding is that the batch timeout for batches sitting >>in >> >accumulator should be larger than linger.ms but should not be too long >> >(e.g. less than 60 seconds). Otherwise it will exhaust the shared >>buffer >> >with batches to be aborted. >> > >> >That said, I do agree it is reasonable to buffer the message for some >>time >> >so messages to other partitions can still get sent. But adding another >> >expiration in addition to linger.ms - which is essentially a timeout - >> >sounds a little bit confusing. Maybe we can do this, let the batch sit >>in >> >accumulator up to linger.ms, then fail it if necessary. >> > >> >What do you think? >> > >> >Thanks, >> > >> >Jiangjie (Becket) Qin >> > >> >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote: >> > >> >>Jiangjie, >> >> >> >>Allowing messages to be accumulated in an offline partition could be >> >>useful >> >>since the partition may become available before the request timeout or >> >>linger time is reached. Now that we are planning to add a new >>timeout, it >> >>would be useful to think through whether/how that applies to messages >>in >> >>the accumulator too. >> >> >> >>Thanks, >> >> >> >>Jun >> >> >> >> >> >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin >><j...@linkedin.com.invalid >> > >> >>wrote: >> >> >> >>> Hi Harsha, >> >>> >> >>> Took a quick look at the patch. I think it is still a little bit >> >>> different. KAFKA-1788 only handles the case where a batch sitting in >> >>> accumulator for too long. The KIP is trying to solve the issue >>where a >> >>> batch has already been drained from accumulator and sent to broker. >> >>> We might be able to apply timeout on batch level to merge those two >> >>>cases >> >>> as Ewen suggested. But I’m not sure if it is a good idea to allow >> >>>messages >> >>> whose target partition is offline to sit in accumulator in the first >> >>>place. >> >>> >> >>> Jiangjie (Becket) Qin >> >>> >> >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" <ka...@harsha.io> >> wrote: >> >>> >> >>> >Guozhang and Jiangjie, >> >>> > Isn’t this work being covered in >> >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please >>the >> >>> >review the patch there. >> >>> >Thanks, >> >>> >Harsha >> >>> > >> >>> > >> >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang >>(wangg...@gmail.com) >> >>> >wrote: >> >>> > >> >>> >Thanks for the update Jiangjie, >> >>> > >> >>> >I think it is actually NOT expected that hardware disconnection >>will >> >>>be >> >>> >detected by the selector, but rather will only be revealed upon TCP >> >>> >timeout, which could be hours. >> >>> > >> >>> >A couple of comments on the wiki: >> >>> > >> >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need the >> >>> >request >> >>> >timeout as implict timeout." I am not very clear what does this >>mean? >> >>> > >> >>> >2. Currently the producer already has a "TIMEOUT_CONFIG" which >>should >> >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add " >> >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: >> >>>admittedly >> >>> > >> >>> >it will change the config names but will reduce confusions moving >> >>> >forward. >> >>> > >> >>> > >> >>> >Guozhang >> >>> > >> >>> > >> >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin >> >>><j...@linkedin.com.invalid> >> >>> > >> >>> >wrote: >> >>> > >> >>> >> Checked the code again. It seems that the disconnected channel is >> >>>not >> >>> >> detected by selector as expected. >> >>> >> >> >>> >> Currently we are depending on the >> >>> >> o.a.k.common.network.Selector.disconnected set to see if we need >>to >> >>>do >> >>> >> something for a disconnected channel. >> >>> >> However Selector.disconnected set is only updated when: >> >>> >> 1. A write/read/connect to channel failed. >> >>> >> 2. A Key is canceled >> >>> >> However when a broker is down before it sends back the response, >>the >> >>> >> client seems not be able to detect this failure. >> >>> >> >> >>> >> I did a simple test below: >> >>> >> 1. Run a selector on one machine and an echo server on another >> >>>machine. >> >>> >> >> >>> >> Connect a selector to an echo server >> >>> >> 2. Send a message to echo server using selector, then let the >> >>>selector >> >>> >> poll() every 10 seconds. >> >>> >> 3. After the sever received the message, unplug cable on the echo >> >>> >>server. >> >>> >> 4. After waiting for 45 min. The selector still did not detected >>the >> >>> >> network failure. >> >>> >> Lsof on selector machine shows that the TCP connection is still >> >>> >>considered >> >>> >> ESTABLISHED. >> >>> >> >> >>> >> I’m not sure in this case what should we expect from the >> >>> >> java.nio.channels.Selector. According to the document, the >>selector >> >>> >>does >> >>> >> not verify the status of the associated channel. In my test case >>it >> >>> >>looks >> >>> >> even worse that OS did not think of the socket has been >> >>>disconnected. >> >>> >> >> >>> >> Anyway. It seems adding the client side request timeout is >> >>>necessary. >> >>> >>I’ve >> >>> >> updated the KIP page to clarify the problem we want to solve >> >>>according >> >>> >>to >> >>> >> Ewen’s comments. >> >>> >> >> >>> >> Thanks. >> >>> >> >> >>> >> Jiangjie (Becket) Qin >> >>> >> >> >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> >> >>>wrote: >> >>> >> >> >>> >> >> >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin >> >>> >><j...@linkedin.com.invalid> >> >>> >> >wrote: >> >>> >> > >> >>> >> >> Hi Ewen, thanks for the comments. Very good points! Please see >> >>> >>replies >> >>> >> >> inline. >> >>> >> >> >> >>> >> >> >> >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" >><e...@confluent.io >> > >> >>> >> wrote: >> >>> >> >> >> >>> >> >> >Jiangjie, >> >>> >> >> > >> >>> >> >> >Great start. I have a couple of comments. >> >>> >> >> > >> >>> >> >> >Under the motivation section, is it really true that the >>request >> >>> >>will >> >>> >> >> >never >> >>> >> >> >be completed? Presumably if the broker goes down the >>connection >> >>> >>will be >> >>> >> >> >severed, at worst by a TCP timeout, which should clean up the >> >>> >> >>connection >> >>> >> >> >and any outstanding requests, right? I think the real reason >>we >> >>> >>need a >> >>> >> >> >different timeout is that the default TCP timeouts are >> >>>ridiculously >> >>> >> >> >>> >> >>long >> >>> >> >> >in >> >>> >> >> >this context. >> >>> >> >> Yes, when broker is completely down the request should be >>cleared >> >>>as >> >>> >>you >> >>> >> >> said. The case we encountered looks like the broker was just >>not >> >>> >> >> responding but TCP connection was still alive though. >> >>> >> >> >> >>> >> > >> >>> >> >Ok, that makes sense. >> >>> >> > >> >>> >> > >> >>> >> >> >> >>> >> >> > >> >>> >> >> >My second question is about whether this is the right level >>to >> >>> >>tackle >> >>> >> >>the >> >>> >> >> >issue/what user-facing changes need to be made. A related >> >>>problem >> >>> >>came >> >>> >> >>up >> >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where >> >>>producer >> >>> >> >> records >> >>> >> >> >get stuck indefinitely because there's no client-side >>timeout. >> >>>This >> >>> >>KIP >> >>> >> >> >wouldn't fix that problem or any problems caused by lack of >> >>> >> >>connectivity >> >>> >> >> >since this would only apply to in flight requests, which by >> >>> >>definition >> >>> >> >> >must >> >>> >> >> >have been sent on an active connection. >> >>> >> >> > >> >>> >> >> >I suspect both types of problems probably need to be >>addressed >> >>> >> >>separately >> >>> >> >> >by introducing explicit timeouts. However, because the >>settings >> >>> >> >>introduced >> >>> >> >> >here are very much about the internal implementations of the >> >>> >>clients, >> >>> >> >>I'm >> >>> >> >> >wondering if this even needs to be a user-facing setting, >> >>> >>especially >> >>> >> >>if we >> >>> >> >> >have to add other timeouts anyway. For example, would a >>fixed, >> >>> >>generous >> >>> >> >> >value that's still much shorter than a TCP timeout, say 15s, >>be >> >>> >>good >> >>> >> >> >enough? If other timeouts would allow, for example, the >>clients >> >>>to >> >>> >> >> >properly >> >>> >> >> >exit even if requests have not hit their timeout, then what's >> >>>the >> >>> >> >>benefit >> >>> >> >> >of being able to configure the request-level timeout? >> >>> >> >> That is a very good point. We have three places that we might >>be >> >>> >>able to >> >>> >> >> enforce timeout for a message send: >> >>> >> >> 1. Before append to accumulator - handled by metadata timeout >>on >> >>>per >> >>> >> >> >>> >> >> message level. >> >>> >> >> 2. Batch of messages inside accumulator - no timeout mechanism >> >>>now. >> >>> >> >> 3. Request of batches after messages leave the accumulator - >>we >> >>>have >> >>> >>a >> >>> >> >> broker side timeout but no client side timeout for now. >> >>> >> >> My current proposal only address (3) but not (2). >> >>> >> >> Honestly I do not have a very clear idea about what should we >>do >> >>> >>with >> >>> >> >>(2) >> >>> >> >> right now. But I am with you that we should not expose too >>many >> >>> >> >> configurations to users. What I am thinking now to handle (2) >>is >> >>> >>when >> >>> >> >>user >> >>> >> >> call send, if we know that a partition is offline, we should >> >>>throw >> >>> >> >> exception immediately instead of putting it into accumulator. >> >>>This >> >>> >>would >> >>> >> >> protect further memory consumption. We might also want to fail >> >>>all >> >>> >>the >> >>> >> >> batches in the dequeue once we found a partition is offline. >>That >> >>> >> >>said, I >> >>> >> >> feel timeout might not be quite applicable to (2). >> >>> >> >> Do you have any suggestion on this? >> >>> >> >> >> >>> >> > >> >>> >> >Right, I didn't actually mean to solve 2 here, but was trying to >> >>> >>figure >> >>> >> >out >> >>> >> >if a solution to 2 would reduce what we needed to do to address >>3. >> >>> >>(And >> >>> >> >depending on how they are implemented, fixing 1 might also >>address >> >>>2). >> >>> >>It >> >>> >> >sounds like you hit hang that I wasn't really expecting. This >> >>>probably >> >>> >> >> >>> >> >just >> >>> >> >means the KIP motivation needs to be a bit clearer about what >>type >> >>>of >> >>> >> >situation this addresses. The cause of the hang may also be >> >>>relevant >> >>> >>-- if >> >>> >> >it was something like a deadlock then that's something that >>should >> >>> >>just be >> >>> >> >fixed, but if it's something outside our control then a timeout >> >>>makes >> >>> >>a >> >>> >> >lot >> >>> >> >more sense. >> >>> >> > >> >>> >> > >> >>> >> >> > >> >>> >> >> >I know we have a similar setting, >> >>> >> >>max.in.flights.requests.per.connection, >> >>> >> >> >exposed publicly (which I just discovered is missing from the >> >>>new >> >>> >> >>producer >> >>> >> >> >configs documentation). But it looks like the new consumer is >> >>>not >> >>> >> >>exposing >> >>> >> >> >that option, using a fixed value instead. I think we should >> >>>default >> >>> >>to >> >>> >> >> >hiding these implementation values unless there's a strong >>case >> >>>for >> >>> >>a >> >>> >> >> >scenario that requires customization. >> >>> >> >> For producer, max.in.flight.requests.per.connection really >> >>>matters. >> >>> >>If >> >>> >> >> people do not want to have reorder of messages, they have to >>use >> >>> >> >> max.in.flight.requests.per.connection=1. On the other hand, if >> >>> >> >>throughput >> >>> >> >> is more of a concern, it could be set to higher. For the new >> >>> >>consumer, I >> >>> >> >> checked the value and I am not sure if the hard coded >> >>> >> >> max.in.flight.requests.per.connection=100 is the right value. >> >>> >>Without >> >>> >> >>the >> >>> >> >> response to the previous request, what offsets should be put >>into >> >>> >>the >> >>> >> >>next >> >>> >> >> fetch request? It seems to me the value will be one natively >> >>> >>regardless >> >>> >> >>of >> >>> >> >> the setting unless we are sending fetch request to different >> >>> >>partitions, >> >>> >> >> which does not look like the case. >> >>> >> >> Anyway, it looks to be a separate issue orthogonal to the >>request >> >>> >> >>timeout. >> >>> >> >> >> >>> >> > >> >>> >> > >> >>> >> >> >> >>> >> >> >In other words, since the only user-facing change was the >> >>>addition >> >>> >>of >> >>> >> >>the >> >>> >> >> >setting, I'm wondering if we can avoid the KIP altogether by >> >>>just >> >>> >> >>choosing >> >>> >> >> >a good default value for the timeout. >> >>> >> >> The problem is that we have a server side request timeout >>exposed >> >>>as >> >>> >>a >> >>> >> >> public configuration. We cannot set the client timeout smaller >> >>>than >> >>> >>that >> >>> >> >> value, so a hard coded value probably won¹t work here. >> >>> >> >> >> >>> >> > >> >>> >> >That makes sense, although it's worth keeping in mind that even >>if >> >>>you >> >>> >>use >> >>> >> >"correct" values, they could still be violated due to, e.g., a >>GC >> >>> >>pause >> >>> >> >that causes the broker to process a request after it is >>supposed to >> >>> >>have >> >>> >> >expired. >> >>> >> > >> >>> >> >-Ewen >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> >> > >> >>> >> >> >-Ewen >> >>> >> >> > >> >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin >> >>> >> >><j...@linkedin.com.invalid> >> >>> >> >> >wrote: >> >>> >> >> > >> >>> >> >> >> Hi, >> >>> >> >> >> >> >>> >> >> >> I just created a KIP to add a request timeout to >>NetworkClient >> >>> >>for >> >>> >> >>new >> >>> >> >> >> Kafka clients. >> >>> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >>> >> >> >>> >> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request >> >>>+ >> >>> >> >> >>> >> >> >>timeout+to+NetworkClient >> >>> >> >> >> >> >>> >> >> >> Comments and suggestions are welcome! >> >>> >> >> >> >> >>> >> >> >> Thanks. >> >>> >> >> >> >> >>> >> >> >> Jiangjie (Becket) Qin >> >>> >> >> >> >> >>> >> >> >> >> >>> >> >> > >> >>> >> >> > >> >>> >> >> >-- >> >>> >> >> >Thanks, >> >>> >> >> >Ewen >> >>> >> >> >> >>> >> >> >> >>> >> > >> >>> >> > >> >>> >> >-- >> >>> >> >Thanks, >> >>> >> >Ewen >> >>> >> >> >>> >> >> >>> > >> >>> > >> >>> >-- >> >>> >-- Guozhang >> >>> >> >>> >> > >> >> > > >-- >-Regards, >Mayuresh R. Gharat >(862) 250-7125