Make sense. Please file a JIRA and attach a patch there. It will be great to add a simple test case as well.
Thanks, Neha On Wed, Sep 17, 2014 at 8:25 AM, Jonathan Weeks Gmail < jonathanbwe...@gmail.com> wrote: > > The issue is that even with one down broker, the rest of the cluster is > up, but unreachable from the producer client in this case, which defeats > the high availability characteristics of clustering. > > For any producer trying to use the service, it is "russian roulette" > whether you will get meta-data back when asking for topic/partition data. > > The ClientUtils code rightly iterates through the broker list looking for > the metadata in random order, but if the first broker in the list is down, > the others are never retried in a timely manner. > > An example stacktrace shows the problem: > > default-dispatcher-3" prio=5 tid=0x00007fef131c6000 nid=0x5f03 runnable > [0x00000001146d2000] > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > - locked <0x00000007ad4c1b50> (a java.lang.Object) > - locked <0x00000007ad4c1b70> (a java.lang.Object) > - locked <0x00000007ad4c1b60> (a java.lang.Object) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > - locked <0x00000007ad3f3408> (a java.lang.Object) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) > at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > - locked <0x00000007ad3de648> (a java.lang.Object) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:46) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > at kafka.producer.Producer.send(Producer.scala:76) > > An eight minute timeout is a non-starter for a clustered (HA) service. One > would expect the system to respect the request.timeout.ms config setting, > which it does, unless a broker host is down and happens to be first in the > shuffled list of brokers to try to get the metadata. > > I believe this bug is also exacerbated by the fact that the meta data is > (rightly) refreshed via the topic.metadata.refresh.interval.ms config > setting, which defaults to every 10 minutes. AFAIK, this means that if a > single broker is down, every new producer as well as every existing > producer has a (1/clusterSize-1) chance of either not starting or hanging > for a minimum of 8 minutes, (assuming the tcp connection code times out), > every 10 minutes (or whatever topic.metadata.refresh.interval.ms is set > to), if I understand correctly. > > Initializing the SocketChannel in code that doesn't respect the > request.timeout.ms setting logically defeats the spirit of the timeout > setting as well makes as the iteration code in ClientUtils far less useful: > > (from fetchTopicMetadata:) > val shuffledBrokers = Random.shuffle(brokers) > while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { > val producer: SyncProducer = > ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) > info("Fetching metadata from broker %s with correlation id %d for %d > topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) > try { > topicMetadataResponse = producer.send(topicMetadataRequest) > > Opening the connection with a timeout as Jack suggests seems far > preferable to the current situation. > > Best Regards, > > -Jonathan > > > On Sep 16, 2014, at 10:08 PM, Jun Rao <jun...@gmail.com> wrote: > > Jack, > > > > If the broker is down, channel.connect() should throw an IOException, > > instead of blocking forever. In your case, is the broker host down? In > that > > case, the connect call will likely wait for the default tcp connection > > timeout, which is 8+ mins. > > > > Thanks, > > > > Jun > > > > On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy <j...@whitepages.com> wrote: > > > >> We observe that when a broker is down, Producer.send() can get into a > >> state where it will block forever, even when using the async producer. > >> > >> When a Producer first sends data, it fetches topic metadata from the > >> broker cluster. To do this, it shuffles the list of hosts in the > cluster, > >> then iterates through the list querying each broker. > >> > >> For each broker in the shuffled list, the Producer creates a > SyncProducer > >> and invokes SyncProducer.send(). > >> SyncProducer.send() creates a BlockingChannel and invokes > >> BlockingChannel.connect(). > >> BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, > >> sets it to blocking mode, and invokes SocketChannel.connect(), passing > the > >> current broker hostname. > >> > >> If the first broker in the list is nonresponsive, > SocketChannel.connect() > >> will wait forever. > >> > >> I think the correct change is as follows: > >> > >> diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala > >> b/core/src/main/scala/kafka/network/BlockingChannel.scala > >> index eb7bb14..9bb102a 100644 > >> --- a/core/src/main/scala/kafka/network/BlockingChannel.scala > >> +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala > >> @@ -55,7 +55,7 @@ class BlockingChannel( val host: String, > >> channel.socket.setSoTimeout(readTimeoutMs) > >> channel.socket.setKeepAlive(true) > >> channel.socket.setTcpNoDelay(true) > >> - channel.connect(new InetSocketAddress(host, port)) > >> + channel.socket.connect(new InetSocketAddress(host, port), > >> connectTimeoutMs) > >> > >> writeChannel = channel > >> readChannel = > Channels.newChannel(channel.socket().getInputStream) > >> > >> Is the next step to create a JIRA with this information? Thanks. > >> > >> -- > >> Jack Foy <j...@whitepages.com> > >> > >> > >> > >> > >