Make sense. Please file a JIRA and attach a patch there. It will be great
to add a simple test case as well.

Thanks,
Neha

On Wed, Sep 17, 2014 at 8:25 AM, Jonathan Weeks Gmail <
jonathanbwe...@gmail.com> wrote:

>
> The issue is that even with one down broker, the rest of the cluster is
> up, but unreachable from the producer client in this case, which defeats
> the high availability characteristics of clustering.
>
> For any producer trying to use the service, it is "russian roulette"
> whether you will get meta-data back when asking for topic/partition data.
>
> The ClientUtils code rightly iterates through the broker list looking for
> the metadata in random order, but if the first broker in the list is down,
> the others are never retried in a timely manner.
>
> An example stacktrace shows the problem:
>
> default-dispatcher-3" prio=5 tid=0x00007fef131c6000 nid=0x5f03 runnable
> [0x00000001146d2000]
>   java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> - locked <0x00000007ad4c1b50> (a java.lang.Object)
> - locked <0x00000007ad4c1b70> (a java.lang.Object)
> - locked <0x00000007ad4c1b60> (a java.lang.Object)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> - locked <0x00000007ad3f3408> (a java.lang.Object)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> - locked <0x00000007ad3de648> (a java.lang.Object)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at kafka.producer.Producer.send(Producer.scala:76)
>
> An eight minute timeout is a non-starter for a clustered (HA) service. One
> would expect the system to respect the request.timeout.ms config setting,
> which it does, unless a broker host is down and happens to be first in the
> shuffled list of brokers to try to get the metadata.
>
> I believe this bug is also exacerbated by the fact that the meta data is
> (rightly) refreshed via the topic.metadata.refresh.interval.ms config
> setting, which defaults to every 10 minutes. AFAIK, this means that if a
> single broker is down, every new producer as well as every existing
> producer has a (1/clusterSize-1) chance of either not starting or hanging
> for a minimum of 8 minutes, (assuming the tcp connection code times out),
> every 10 minutes (or whatever topic.metadata.refresh.interval.ms is set
> to), if I understand correctly.
>
> Initializing the SocketChannel in code that doesn't respect the
> request.timeout.ms setting logically defeats the spirit of the timeout
> setting as well makes as the iteration code in ClientUtils far less useful:
>
> (from fetchTopicMetadata:)
> val shuffledBrokers = Random.shuffle(brokers)
> while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
>   val producer: SyncProducer =
> ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
>   info("Fetching metadata from broker %s with correlation id %d for %d
> topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
>   try {
>     topicMetadataResponse = producer.send(topicMetadataRequest)
>
> Opening the connection with a timeout as Jack suggests seems far
> preferable to the current situation.
>
> Best Regards,
>
> -Jonathan
>
>
> On Sep 16, 2014, at 10:08 PM, Jun Rao <jun...@gmail.com> wrote:
> > Jack,
> >
> > If the broker is down, channel.connect() should throw an IOException,
> > instead of blocking forever. In your case, is the broker host down? In
> that
> > case, the connect call will likely wait for the default tcp connection
> > timeout, which is 8+ mins.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy <j...@whitepages.com> wrote:
> >
> >> We observe that when a broker is down, Producer.send() can get into a
> >> state where it will block forever, even when using the async producer.
> >>
> >> When a Producer first sends data, it fetches topic metadata from the
> >> broker cluster. To do this, it shuffles the list of hosts in the
> cluster,
> >> then iterates through the list querying each broker.
> >>
> >> For each broker in the shuffled list, the Producer creates a
> SyncProducer
> >> and invokes SyncProducer.send().
> >> SyncProducer.send() creates a BlockingChannel and invokes
> >> BlockingChannel.connect().
> >> BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel,
> >> sets it to blocking mode, and invokes SocketChannel.connect(), passing
> the
> >> current broker hostname.
> >>
> >> If the first broker in the list is nonresponsive,
> SocketChannel.connect()
> >> will wait forever.
> >>
> >> I think the correct change is as follows:
> >>
> >> diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala
> >> b/core/src/main/scala/kafka/network/BlockingChannel.scala
> >> index eb7bb14..9bb102a 100644
> >> --- a/core/src/main/scala/kafka/network/BlockingChannel.scala
> >> +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
> >> @@ -55,7 +55,7 @@ class BlockingChannel( val host: String,
> >>         channel.socket.setSoTimeout(readTimeoutMs)
> >>         channel.socket.setKeepAlive(true)
> >>         channel.socket.setTcpNoDelay(true)
> >> -        channel.connect(new InetSocketAddress(host, port))
> >> +        channel.socket.connect(new InetSocketAddress(host, port),
> >> connectTimeoutMs)
> >>
> >>         writeChannel = channel
> >>         readChannel =
> Channels.newChannel(channel.socket().getInputStream)
> >>
> >> Is the next step to create a JIRA with this information? Thanks.
> >>
> >> --
> >> Jack Foy <j...@whitepages.com>
> >>
> >>
> >>
> >>
>
>

Reply via email to