I think my confusion is coming from this: > So in this KIP, we only address (3). The only public interface change is a > new configuration of request timeout (and maybe change the configuration > name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so "timeout.ms" becomes " replication.timeout.ms". This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing jars to break if you try to run against an updated client (which seems not very significant since I doubt people upgrade these without recompiling but maybe I'm wrong about that). And it breaks builds without have deprecated that field first, which again, is probably not the biggest issue but is annoying for users and when we accidentally changed the API we received a complaint about breaking builds. * Behavior compatibility as Jay mentioned on the call -- setting the config (even if the name changed) doesn't have the same effect it used to. One solution, which admittedly is more painful to implement and maintain, would be to maintain the timeout.ms config, have it override the others if it is specified (including an infinite request timeout I guess?), and if it isn't specified, we can just use the new config variables. Given a real deprecation schedule, users would have better warning of changes and a window to make the changes. I actually think it might not be necessary to maintain the old behavior precisely, although maybe for some code it is an issue if they start seeing timeout exceptions that they wouldn't have seen before? -Ewen On Wed, May 6, 2015 at 6:06 PM, Jun Rao <j...@confluent.io> wrote: > Jiangjie, > > Yes, I think using metadata timeout to expire batches in the record > accumulator makes sense. > > Thanks, > > Jun > > On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > > > I incorporated Ewen and Guozhang’s comments in the KIP page. Want to > speed > > up on this KIP because currently we experience mirror-maker hung very > > likely when a broker is down. > > > > I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata > > timeout to expire the batches which are sitting in accumulator without > > leader info. I did that because the situation there is essentially > missing > > metadata. > > > > As a summary of what I am thinking about the timeout in new Producer: > > > > 1. Metadata timeout: > > - used in send(), blocking > > - used in accumulator to expire batches with timeout exception. > > 2. Linger.ms > > - Used in accumulator to ready the batch for drain > > 3. Request timeout > > - Used in NetworkClient to expire a batch and retry if no response is > > received for a request before timeout. > > > > So in this KIP, we only address (3). The only public interface change is > a > > new configuration of request timeout (and maybe change the configuration > > name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). > > > > Would like to see what people think of above approach? > > > > Jiangjie (Becket) Qin > > > > On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: > > > > >Jun, > > > > > >I thought a little bit differently on this. > > >Intuitively, I am thinking that if a partition is offline, the metadata > > >for that partition should be considered not ready because we don’t know > > >which broker we should send the message to. So those sends need to be > > >blocked on metadata timeout. > > >Another thing I’m wondering is in which scenario an offline partition > will > > >become online again in a short period of time and how likely it will > > >occur. My understanding is that the batch timeout for batches sitting in > > >accumulator should be larger than linger.ms but should not be too long > > >(e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer > > >with batches to be aborted. > > > > > >That said, I do agree it is reasonable to buffer the message for some > time > > >so messages to other partitions can still get sent. But adding another > > >expiration in addition to linger.ms - which is essentially a timeout - > > >sounds a little bit confusing. Maybe we can do this, let the batch sit > in > > >accumulator up to linger.ms, then fail it if necessary. > > > > > >What do you think? > > > > > >Thanks, > > > > > >Jiangjie (Becket) Qin > > > > > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote: > > > > > >>Jiangjie, > > >> > > >>Allowing messages to be accumulated in an offline partition could be > > >>useful > > >>since the partition may become available before the request timeout or > > >>linger time is reached. Now that we are planning to add a new timeout, > it > > >>would be useful to think through whether/how that applies to messages > in > > >>the accumulator too. > > >> > > >>Thanks, > > >> > > >>Jun > > >> > > >> > > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin > <j...@linkedin.com.invalid > > > > > >>wrote: > > >> > > >>> Hi Harsha, > > >>> > > >>> Took a quick look at the patch. I think it is still a little bit > > >>> different. KAFKA-1788 only handles the case where a batch sitting in > > >>> accumulator for too long. The KIP is trying to solve the issue where > a > > >>> batch has already been drained from accumulator and sent to broker. > > >>> We might be able to apply timeout on batch level to merge those two > > >>>cases > > >>> as Ewen suggested. But I’m not sure if it is a good idea to allow > > >>>messages > > >>> whose target partition is offline to sit in accumulator in the first > > >>>place. > > >>> > > >>> Jiangjie (Becket) Qin > > >>> > > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" <ka...@harsha.io> > > wrote: > > >>> > > >>> >Guozhang and Jiangjie, > > >>> > Isn’t this work being covered in > > >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please > the > > >>> >review the patch there. > > >>> >Thanks, > > >>> >Harsha > > >>> > > > >>> > > > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com > ) > > >>> >wrote: > > >>> > > > >>> >Thanks for the update Jiangjie, > > >>> > > > >>> >I think it is actually NOT expected that hardware disconnection will > > >>>be > > >>> >detected by the selector, but rather will only be revealed upon TCP > > >>> >timeout, which could be hours. > > >>> > > > >>> >A couple of comments on the wiki: > > >>> > > > >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need the > > >>> >request > > >>> >timeout as implict timeout." I am not very clear what does this > mean? > > >>> > > > >>> >2. Currently the producer already has a "TIMEOUT_CONFIG" which > should > > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add " > > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: > > >>>admittedly > > >>> > > > >>> >it will change the config names but will reduce confusions moving > > >>> >forward. > > >>> > > > >>> > > > >>> >Guozhang > > >>> > > > >>> > > > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin > > >>><j...@linkedin.com.invalid> > > >>> > > > >>> >wrote: > > >>> > > > >>> >> Checked the code again. It seems that the disconnected channel is > > >>>not > > >>> >> detected by selector as expected. > > >>> >> > > >>> >> Currently we are depending on the > > >>> >> o.a.k.common.network.Selector.disconnected set to see if we need > to > > >>>do > > >>> >> something for a disconnected channel. > > >>> >> However Selector.disconnected set is only updated when: > > >>> >> 1. A write/read/connect to channel failed. > > >>> >> 2. A Key is canceled > > >>> >> However when a broker is down before it sends back the response, > the > > >>> >> client seems not be able to detect this failure. > > >>> >> > > >>> >> I did a simple test below: > > >>> >> 1. Run a selector on one machine and an echo server on another > > >>>machine. > > >>> >> > > >>> >> Connect a selector to an echo server > > >>> >> 2. Send a message to echo server using selector, then let the > > >>>selector > > >>> >> poll() every 10 seconds. > > >>> >> 3. After the sever received the message, unplug cable on the echo > > >>> >>server. > > >>> >> 4. After waiting for 45 min. The selector still did not detected > the > > >>> >> network failure. > > >>> >> Lsof on selector machine shows that the TCP connection is still > > >>> >>considered > > >>> >> ESTABLISHED. > > >>> >> > > >>> >> I’m not sure in this case what should we expect from the > > >>> >> java.nio.channels.Selector. According to the document, the > selector > > >>> >>does > > >>> >> not verify the status of the associated channel. In my test case > it > > >>> >>looks > > >>> >> even worse that OS did not think of the socket has been > > >>>disconnected. > > >>> >> > > >>> >> Anyway. It seems adding the client side request timeout is > > >>>necessary. > > >>> >>I’ve > > >>> >> updated the KIP page to clarify the problem we want to solve > > >>>according > > >>> >>to > > >>> >> Ewen’s comments. > > >>> >> > > >>> >> Thanks. > > >>> >> > > >>> >> Jiangjie (Becket) Qin > > >>> >> > > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> > > >>>wrote: > > >>> >> > > >>> >> > > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin > > >>> >><j...@linkedin.com.invalid> > > >>> >> >wrote: > > >>> >> > > > >>> >> >> Hi Ewen, thanks for the comments. Very good points! Please see > > >>> >>replies > > >>> >> >> inline. > > >>> >> >> > > >>> >> >> > > >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" < > e...@confluent.io > > > > > >>> >> wrote: > > >>> >> >> > > >>> >> >> >Jiangjie, > > >>> >> >> > > > >>> >> >> >Great start. I have a couple of comments. > > >>> >> >> > > > >>> >> >> >Under the motivation section, is it really true that the > request > > >>> >>will > > >>> >> >> >never > > >>> >> >> >be completed? Presumably if the broker goes down the > connection > > >>> >>will be > > >>> >> >> >severed, at worst by a TCP timeout, which should clean up the > > >>> >> >>connection > > >>> >> >> >and any outstanding requests, right? I think the real reason > we > > >>> >>need a > > >>> >> >> >different timeout is that the default TCP timeouts are > > >>>ridiculously > > >>> >> > > >>> >> >>long > > >>> >> >> >in > > >>> >> >> >this context. > > >>> >> >> Yes, when broker is completely down the request should be > cleared > > >>>as > > >>> >>you > > >>> >> >> said. The case we encountered looks like the broker was just > not > > >>> >> >> responding but TCP connection was still alive though. > > >>> >> >> > > >>> >> > > > >>> >> >Ok, that makes sense. > > >>> >> > > > >>> >> > > > >>> >> >> > > >>> >> >> > > > >>> >> >> >My second question is about whether this is the right level to > > >>> >>tackle > > >>> >> >>the > > >>> >> >> >issue/what user-facing changes need to be made. A related > > >>>problem > > >>> >>came > > >>> >> >>up > > >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where > > >>>producer > > >>> >> >> records > > >>> >> >> >get stuck indefinitely because there's no client-side timeout. > > >>>This > > >>> >>KIP > > >>> >> >> >wouldn't fix that problem or any problems caused by lack of > > >>> >> >>connectivity > > >>> >> >> >since this would only apply to in flight requests, which by > > >>> >>definition > > >>> >> >> >must > > >>> >> >> >have been sent on an active connection. > > >>> >> >> > > > >>> >> >> >I suspect both types of problems probably need to be addressed > > >>> >> >>separately > > >>> >> >> >by introducing explicit timeouts. However, because the > settings > > >>> >> >>introduced > > >>> >> >> >here are very much about the internal implementations of the > > >>> >>clients, > > >>> >> >>I'm > > >>> >> >> >wondering if this even needs to be a user-facing setting, > > >>> >>especially > > >>> >> >>if we > > >>> >> >> >have to add other timeouts anyway. For example, would a fixed, > > >>> >>generous > > >>> >> >> >value that's still much shorter than a TCP timeout, say 15s, > be > > >>> >>good > > >>> >> >> >enough? If other timeouts would allow, for example, the > clients > > >>>to > > >>> >> >> >properly > > >>> >> >> >exit even if requests have not hit their timeout, then what's > > >>>the > > >>> >> >>benefit > > >>> >> >> >of being able to configure the request-level timeout? > > >>> >> >> That is a very good point. We have three places that we might > be > > >>> >>able to > > >>> >> >> enforce timeout for a message send: > > >>> >> >> 1. Before append to accumulator - handled by metadata timeout > on > > >>>per > > >>> >> > > >>> >> >> message level. > > >>> >> >> 2. Batch of messages inside accumulator - no timeout mechanism > > >>>now. > > >>> >> >> 3. Request of batches after messages leave the accumulator - we > > >>>have > > >>> >>a > > >>> >> >> broker side timeout but no client side timeout for now. > > >>> >> >> My current proposal only address (3) but not (2). > > >>> >> >> Honestly I do not have a very clear idea about what should we > do > > >>> >>with > > >>> >> >>(2) > > >>> >> >> right now. But I am with you that we should not expose too many > > >>> >> >> configurations to users. What I am thinking now to handle (2) > is > > >>> >>when > > >>> >> >>user > > >>> >> >> call send, if we know that a partition is offline, we should > > >>>throw > > >>> >> >> exception immediately instead of putting it into accumulator. > > >>>This > > >>> >>would > > >>> >> >> protect further memory consumption. We might also want to fail > > >>>all > > >>> >>the > > >>> >> >> batches in the dequeue once we found a partition is offline. > That > > >>> >> >>said, I > > >>> >> >> feel timeout might not be quite applicable to (2). > > >>> >> >> Do you have any suggestion on this? > > >>> >> >> > > >>> >> > > > >>> >> >Right, I didn't actually mean to solve 2 here, but was trying to > > >>> >>figure > > >>> >> >out > > >>> >> >if a solution to 2 would reduce what we needed to do to address > 3. > > >>> >>(And > > >>> >> >depending on how they are implemented, fixing 1 might also > address > > >>>2). > > >>> >>It > > >>> >> >sounds like you hit hang that I wasn't really expecting. This > > >>>probably > > >>> >> > > >>> >> >just > > >>> >> >means the KIP motivation needs to be a bit clearer about what > type > > >>>of > > >>> >> >situation this addresses. The cause of the hang may also be > > >>>relevant > > >>> >>-- if > > >>> >> >it was something like a deadlock then that's something that > should > > >>> >>just be > > >>> >> >fixed, but if it's something outside our control then a timeout > > >>>makes > > >>> >>a > > >>> >> >lot > > >>> >> >more sense. > > >>> >> > > > >>> >> > > > >>> >> >> > > > >>> >> >> >I know we have a similar setting, > > >>> >> >>max.in.flights.requests.per.connection, > > >>> >> >> >exposed publicly (which I just discovered is missing from the > > >>>new > > >>> >> >>producer > > >>> >> >> >configs documentation). But it looks like the new consumer is > > >>>not > > >>> >> >>exposing > > >>> >> >> >that option, using a fixed value instead. I think we should > > >>>default > > >>> >>to > > >>> >> >> >hiding these implementation values unless there's a strong > case > > >>>for > > >>> >>a > > >>> >> >> >scenario that requires customization. > > >>> >> >> For producer, max.in.flight.requests.per.connection really > > >>>matters. > > >>> >>If > > >>> >> >> people do not want to have reorder of messages, they have to > use > > >>> >> >> max.in.flight.requests.per.connection=1. On the other hand, if > > >>> >> >>throughput > > >>> >> >> is more of a concern, it could be set to higher. For the new > > >>> >>consumer, I > > >>> >> >> checked the value and I am not sure if the hard coded > > >>> >> >> max.in.flight.requests.per.connection=100 is the right value. > > >>> >>Without > > >>> >> >>the > > >>> >> >> response to the previous request, what offsets should be put > into > > >>> >>the > > >>> >> >>next > > >>> >> >> fetch request? It seems to me the value will be one natively > > >>> >>regardless > > >>> >> >>of > > >>> >> >> the setting unless we are sending fetch request to different > > >>> >>partitions, > > >>> >> >> which does not look like the case. > > >>> >> >> Anyway, it looks to be a separate issue orthogonal to the > request > > >>> >> >>timeout. > > >>> >> >> > > >>> >> > > > >>> >> > > > >>> >> >> > > >>> >> >> >In other words, since the only user-facing change was the > > >>>addition > > >>> >>of > > >>> >> >>the > > >>> >> >> >setting, I'm wondering if we can avoid the KIP altogether by > > >>>just > > >>> >> >>choosing > > >>> >> >> >a good default value for the timeout. > > >>> >> >> The problem is that we have a server side request timeout > exposed > > >>>as > > >>> >>a > > >>> >> >> public configuration. We cannot set the client timeout smaller > > >>>than > > >>> >>that > > >>> >> >> value, so a hard coded value probably won¹t work here. > > >>> >> >> > > >>> >> > > > >>> >> >That makes sense, although it's worth keeping in mind that even > if > > >>>you > > >>> >>use > > >>> >> >"correct" values, they could still be violated due to, e.g., a GC > > >>> >>pause > > >>> >> >that causes the broker to process a request after it is supposed > to > > >>> >>have > > >>> >> >expired. > > >>> >> > > > >>> >> >-Ewen > > >>> >> > > > >>> >> > > > >>> >> > > > >>> >> >> > > > >>> >> >> >-Ewen > > >>> >> >> > > > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin > > >>> >> >><j...@linkedin.com.invalid> > > >>> >> >> >wrote: > > >>> >> >> > > > >>> >> >> >> Hi, > > >>> >> >> >> > > >>> >> >> >> I just created a KIP to add a request timeout to > NetworkClient > > >>> >>for > > >>> >> >>new > > >>> >> >> >> Kafka clients. > > >>> >> >> >> > > >>> >> >> >> > > >>> >> >> >> > > >>> >> >> > > >>> >> >> > > >>> >> > > >>> >> > > >>> > > >>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request > > >>>+ > > >>> >> > > >>> >> >> >>timeout+to+NetworkClient > > >>> >> >> >> > > >>> >> >> >> Comments and suggestions are welcome! > > >>> >> >> >> > > >>> >> >> >> Thanks. > > >>> >> >> >> > > >>> >> >> >> Jiangjie (Becket) Qin > > >>> >> >> >> > > >>> >> >> >> > > >>> >> >> > > > >>> >> >> > > > >>> >> >> >-- > > >>> >> >> >Thanks, > > >>> >> >> >Ewen > > >>> >> >> > > >>> >> >> > > >>> >> > > > >>> >> > > > >>> >> >-- > > >>> >> >Thanks, > > >>> >> >Ewen > > >>> >> > > >>> >> > > >>> > > > >>> > > > >>> >-- > > >>> >-- Guozhang > > >>> > > >>> > > > > > > > > -- Thanks, Ewen