All, I'm having an issue with an integration test I've setup. This is using 0.8-beta1.
The test is to verify that no messages are dropped (or the sender gets an exception thrown back if failure), while doing a rolling restart of a cluster of 2 brokers. The producer is configured to use 'request.required.acks' = '1'. The servers are set up to run locally on localhost, on different ports, and different data dirs. The producer connects with a metadata brokerlist like: "localhost:2024,localhost:1025" (so no vip). The servers are set up with a default replication factor of 2. The servers have controlled shutdown enabled, as well. The producer code looks like this: ... Producer<Integer, T> producer = getProducer(); try { KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic, message); producer.send(msg); return true; } catch (RuntimeException e) { logger.warn("Error sending data to kafka", e); return false; } ... The test sends groups of messages at each stage of the test (e.g. servers up, first server going down, first server coming up, second server going down, etc.). Then a consumer connects and consumes all the messages, to make sure they all arrived ok. It seems intermittently, a single message gets dropped, right after one of the servers starts going down. It doesn't happen always, seems to happen 1 out of every 20 test runs or so. Here's some sample output. I see the exception inside the producer code, but I don't see the producer.send method ever having an exception thrown back out to the caller (the log line "Error sending data to kafka" is never triggered). What's interesting, is that it looks like the exceptions are happening on message 3, but when the consumer subsequently consumes back all the messages in the broker cluster, it seems message 2 (and not message 3) is missing: ... ... 7136 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 3, message: 98 7150 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 3, message: 99 7163 [Thread-2] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Shutting down server2 7163 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 0 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer - Shutting down KafkaServer 7176 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 1 7189 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 2 7203 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 3 7394 [kafka-request-handler-5] WARN state.change.logger - Broker 1946108683 received update metadata request with correlation id 7 from an old controller 178709090 with epoch 2. Latest known controller epoch is 3 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis - [KafkaApi-1946108683] error when handling request Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0] -> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026 kafka.common.ControllerMovedException: Broker 1946108683 received update metadata request with correlation id 7 from an old controller 178709090 with epoch 2. Latest known controller epoch is 3 at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114) at kafka.server.KafkaApis.handle(KafkaApis.scala:72) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:724) 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN kafka.controller.RequestSendThread - [Controller-178709090-to-broker-178709090-send-thread], Controller 178709090 fails to send a request to broker 178709090 java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387) at kafka.utils.Utils$.read(Utils.scala:394) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer - Shut down complete for KafkaServer 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 810 to broker 178709090 with data for partitions [test-topic,0] java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at kafka.utils.Utils$.read(Utils.scala:394) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) at kafka.producer.Producer.send(Producer.scala:74) at kafka.javaapi.producer.Producer.send(Producer.scala:32) at com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) at com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) at java.lang.Thread.run(Thread.java:724) 17319 [Thread-1] ERROR kafka.producer.SyncProducer - Producer connection to localhost:1026 unsuccessful java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.Utils$.swallow(Utils.scala:186) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.Producer.send(Producer.scala:74) at kafka.javaapi.producer.Producer.send(Producer.scala:32) at com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) at com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) at java.lang.Thread.run(Thread.java:724) 17322 [Thread-1] WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 811 for topics [Set(test-topic)] from broker [id:1,host:localhost,port:1026] failed java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.Utils$.swallow(Utils.scala:186) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.Producer.send(Producer.scala:74) at kafka.javaapi.producer.Producer.send(Producer.scala:32) at com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) at com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) at java.lang.Thread.run(Thread.java:724) 17340 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 4 17353 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 5 17365 [Thread-1] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Sending message: test-stage: 4, message: 6 ... ... 23410 [Thread-28] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Received msg 'test-stage: 3, message: 98' 23410 [Thread-28] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Received msg 'test-stage: 3, message: 99' 23410 [Thread-28] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Received msg 'test-stage: 4, message: 0' 23410 [Thread-28] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Received msg 'test-stage: 4, message: 1' 23410 [Thread-28] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Received msg 'test-stage: 4, message: 3' 23411 [Thread-28] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Received msg 'test-stage: 4, message: 4' 23411 [Thread-28] INFO com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - Received msg 'test-stage: 4, message: 5'