Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha
On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. "For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout." I am not very clear what does this mean? 2. Currently the producer already has a "TIMEOUT_CONFIG" which should really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add " REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Checked the code again. It seems that the disconnected channel is not > detected by selector as expected. > > Currently we are depending on the > o.a.k.common.network.Selector.disconnected set to see if we need to do > something for a disconnected channel. > However Selector.disconnected set is only updated when: > 1. A write/read/connect to channel failed. > 2. A Key is canceled > However when a broker is down before it sends back the response, the > client seems not be able to detect this failure. > > I did a simple test below: > 1. Run a selector on one machine and an echo server on another machine. > Connect a selector to an echo server > 2. Send a message to echo server using selector, then let the selector > poll() every 10 seconds. > 3. After the sever received the message, unplug cable on the echo server. > 4. After waiting for 45 min. The selector still did not detected the > network failure. > Lsof on selector machine shows that the TCP connection is still considered > ESTABLISHED. > > I’m not sure in this case what should we expect from the > java.nio.channels.Selector. According to the document, the selector does > not verify the status of the associated channel. In my test case it looks > even worse that OS did not think of the socket has been disconnected. > > Anyway. It seems adding the client side request timeout is necessary. I’ve > updated the KIP page to clarify the problem we want to solve according to > Ewen’s comments. > > Thanks. > > Jiangjie (Becket) Qin > > On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote: > > >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin <j...@linkedin.com.invalid> > >wrote: > > > >> Hi Ewen, thanks for the comments. Very good points! Please see replies > >> inline. > >> > >> > >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <e...@confluent.io> > wrote: > >> > >> >Jiangjie, > >> > > >> >Great start. I have a couple of comments. > >> > > >> >Under the motivation section, is it really true that the request will > >> >never > >> >be completed? Presumably if the broker goes down the connection will be > >> >severed, at worst by a TCP timeout, which should clean up the > >>connection > >> >and any outstanding requests, right? I think the real reason we need a > >> >different timeout is that the default TCP timeouts are ridiculously > >>long > >> >in > >> >this context. > >> Yes, when broker is completely down the request should be cleared as you > >> said. The case we encountered looks like the broker was just not > >> responding but TCP connection was still alive though. > >> > > > >Ok, that makes sense. > > > > > >> > >> > > >> >My second question is about whether this is the right level to tackle > >>the > >> >issue/what user-facing changes need to be made. A related problem came > >>up > >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where producer > >> records > >> >get stuck indefinitely because there's no client-side timeout. This KIP > >> >wouldn't fix that problem or any problems caused by lack of > >>connectivity > >> >since this would only apply to in flight requests, which by definition > >> >must > >> >have been sent on an active connection. > >> > > >> >I suspect both types of problems probably need to be addressed > >>separately > >> >by introducing explicit timeouts. However, because the settings > >>introduced > >> >here are very much about the internal implementations of the clients, > >>I'm > >> >wondering if this even needs to be a user-facing setting, especially > >>if we > >> >have to add other timeouts anyway. For example, would a fixed, generous > >> >value that's still much shorter than a TCP timeout, say 15s, be good > >> >enough? If other timeouts would allow, for example, the clients to > >> >properly > >> >exit even if requests have not hit their timeout, then what's the > >>benefit > >> >of being able to configure the request-level timeout? > >> That is a very good point. We have three places that we might be able to > >> enforce timeout for a message send: > >> 1. Before append to accumulator - handled by metadata timeout on per > >> message level. > >> 2. Batch of messages inside accumulator - no timeout mechanism now. > >> 3. Request of batches after messages leave the accumulator - we have a > >> broker side timeout but no client side timeout for now. > >> My current proposal only address (3) but not (2). > >> Honestly I do not have a very clear idea about what should we do with > >>(2) > >> right now. But I am with you that we should not expose too many > >> configurations to users. What I am thinking now to handle (2) is when > >>user > >> call send, if we know that a partition is offline, we should throw > >> exception immediately instead of putting it into accumulator. This would > >> protect further memory consumption. We might also want to fail all the > >> batches in the dequeue once we found a partition is offline. That > >>said, I > >> feel timeout might not be quite applicable to (2). > >> Do you have any suggestion on this? > >> > > > >Right, I didn't actually mean to solve 2 here, but was trying to figure > >out > >if a solution to 2 would reduce what we needed to do to address 3. (And > >depending on how they are implemented, fixing 1 might also address 2). It > >sounds like you hit hang that I wasn't really expecting. This probably > >just > >means the KIP motivation needs to be a bit clearer about what type of > >situation this addresses. The cause of the hang may also be relevant -- if > >it was something like a deadlock then that's something that should just be > >fixed, but if it's something outside our control then a timeout makes a > >lot > >more sense. > > > > > >> > > >> >I know we have a similar setting, > >>max.in.flights.requests.per.connection, > >> >exposed publicly (which I just discovered is missing from the new > >>producer > >> >configs documentation). But it looks like the new consumer is not > >>exposing > >> >that option, using a fixed value instead. I think we should default to > >> >hiding these implementation values unless there's a strong case for a > >> >scenario that requires customization. > >> For producer, max.in.flight.requests.per.connection really matters. If > >> people do not want to have reorder of messages, they have to use > >> max.in.flight.requests.per.connection=1. On the other hand, if > >>throughput > >> is more of a concern, it could be set to higher. For the new consumer, I > >> checked the value and I am not sure if the hard coded > >> max.in.flight.requests.per.connection=100 is the right value. Without > >>the > >> response to the previous request, what offsets should be put into the > >>next > >> fetch request? It seems to me the value will be one natively regardless > >>of > >> the setting unless we are sending fetch request to different partitions, > >> which does not look like the case. > >> Anyway, it looks to be a separate issue orthogonal to the request > >>timeout. > >> > > > > > >> > >> >In other words, since the only user-facing change was the addition of > >>the > >> >setting, I'm wondering if we can avoid the KIP altogether by just > >>choosing > >> >a good default value for the timeout. > >> The problem is that we have a server side request timeout exposed as a > >> public configuration. We cannot set the client timeout smaller than that > >> value, so a hard coded value probably won¹t work here. > >> > > > >That makes sense, although it's worth keeping in mind that even if you use > >"correct" values, they could still be violated due to, e.g., a GC pause > >that causes the broker to process a request after it is supposed to have > >expired. > > > >-Ewen > > > > > > > >> > > >> >-Ewen > >> > > >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin > >><j...@linkedin.com.invalid> > >> >wrote: > >> > > >> >> Hi, > >> >> > >> >> I just created a KIP to add a request timeout to NetworkClient for > >>new > >> >> Kafka clients. > >> >> > >> >> > >> >> > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+ > >> >>timeout+to+NetworkClient > >> >> > >> >> Comments and suggestions are welcome! > >> >> > >> >> Thanks. > >> >> > >> >> Jiangjie (Becket) Qin > >> >> > >> >> > >> > > >> > > >> >-- > >> >Thanks, > >> >Ewen > >> > >> > > > > > >-- > >Thanks, > >Ewen > > -- -- Guozhang