Hello all;

I easily reproduce an annoying scenario, using Kafka
brokers kafka_2.10-0.8.2.0, but I believe it has nothing to do with the
brokersbut only with the consumer API.

Here is the problem: a producer is continuously writing using the sync
producer api  to a topic (2 partitions, replicated), on a three Kafka
cluster nodes.
I then power off the Kafka broker that is leader on at least one of the
partition.

What I observe is my Java client code is stuck for very long time (several
minutes). I tested the  kafka_2.10-0.8.2.0 and the latest
Right now I see no other option to kill restart my java application when I
detect that, but of course it's cumbersome.

I tried playing with the request timeout but without success.

Clearly the producer is stuck on the standard socket calls,  in a case
where the target node is unreachable, and of course did not cleanly closed
any socket,
but it seems to me that the producer should react on its own no ?
Thanks in advance for any advise,

Below are the stack traces. I can repeateadly generate them, it's always
stuck there.

Dimi


Here is the stack trace with  kafka_2.10-0.8.2.0 consumer API:

"Thread-16-kafka_bolt-executor[3 3]" #121 prio=5 os_prio=0
tid=0x00007fd0100b1000 nid=0x32f0 runnable [0x00007fcfef8f8000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
    - locked <0x00000000f97caf78> (a java.lang.Object)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
    at kafka.network.BoundedByteBufferSend.writeTo(
BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at kafka.network.BoundedByteBufferSend.writeCompletely(
BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:73)
    - locked <0x00000000f994cbd8> (a java.lang.Object)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply$mcV$sp(SyncProducer.scala:104)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply(SyncProducer.scala:104)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply(SyncProducer.scala:104)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(
SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(
SyncProducer.scala:103)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(
SyncProducer.scala:103)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:102)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$
DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$
dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
    at kafka.producer.async.DefaultEventHandler$$anonfun$
dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:772)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.
apply(HashMap.scala:98)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.
apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(
HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$WithFilter.
foreach(TraversableLike.scala:771)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(
DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(
DefaultEventHandler.scala:72)
    at kafka.producer.Producer.send(Producer.scala:76)
    - locked <0x00000000c386faf0> (a java.lang.Object)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
   ....



And here is the one with 2.10 0.10.0.1

hread-16-kafka_bolt-executor[3 3]" #125 prio=5 os_prio=0
tid=0x00007f5dd8820000 nid=0x5e59 runnable [0x00007f5db50d2000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
    - locked <0x00000000ff4e66a8> (a java.lang.Object)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
    at org.apache.kafka.common.network.ByteBufferSend.
writeTo(ByteBufferSend.java:57)
    at kafka.network.RequestOrResponseSend.writeCompletely(
RequestOrResponseSend.scala:50)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:113)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:79)
    - locked <0x00000000ff4e6568> (a java.lang.Object)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply(SyncProducer.scala:110)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$
mcV$sp$1.apply(SyncProducer.scala:110)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(
SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(
SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(
SyncProducer.scala:109)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$
DefaultEventHandler$$send(DefaultEventHandler.scala:275)
    at kafka.producer.async.DefaultEventHandler$$anonfun$
dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
    at kafka.producer.async.DefaultEventHandler$$anonfun$
dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:772)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.
apply(HashMap.scala:98)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.
apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(
HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$WithFilter.
foreach(TraversableLike.scala:771)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(
DefaultEventHandler.scala:105)
    at kafka.producer.async.DefaultEventHandler.handle(
DefaultEventHandler.scala:78)
    at kafka.producer.Producer.send(Producer.scala:78)
    - locked <0x00000000c396e8d0> (a java.lang.Object)
    at kafka.javaapi.producer.Producer.send(Producer.scala:35)

Reply via email to