The occasional single message loss could happen since required.request.acks=1 and the leader is shut down before the follower gets a chance to copy the message. Can you try your test with num acks set to -1 ?
Thanks, Neha On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <j...@squareup.com> wrote: > All, > > I'm having an issue with an integration test I've setup. This is using > 0.8-beta1. > > The test is to verify that no messages are dropped (or the sender gets an > exception thrown back if failure), while doing a rolling restart of a > cluster of 2 brokers. > > The producer is configured to use 'request.required.acks' = '1'. > > The servers are set up to run locally on localhost, on different ports, and > different data dirs. The producer connects with a metadata brokerlist > like: "localhost:2024,localhost:1025" (so no vip). The servers are set > up with a default replication factor of 2. The servers have controlled > shutdown enabled, as well. > > The producer code looks like this: > ... > Producer<Integer, T> producer = getProducer(); > try { > KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic, > message); > producer.send(msg); > return true; > } catch (RuntimeException e) { > logger.warn("Error sending data to kafka", e); > return false; > } > ... > > The test sends groups of messages at each stage of the test (e.g. servers > up, first server going down, first server coming up, second server going > down, etc.). Then a consumer connects and consumes all the messages, to > make sure they all arrived ok. > > It seems intermittently, a single message gets dropped, right after one of > the servers starts going down. It doesn't happen always, seems to happen 1 > out of every 20 test runs or so. Here's some sample output. I see the > exception inside the producer code, but I don't see the producer.send > method ever having an exception thrown back out to the caller (the log line > "Error sending data to kafka" is never triggered). > > What's interesting, is that it looks like the exceptions are happening on > message 3, but when the consumer subsequently consumes back all the > messages in the broker cluster, it seems message 2 (and not message 3) is > missing: > > ... > ... > 7136 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 3, message: 98 > 7150 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 3, message: 99 > 7163 [Thread-2] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Shutting down server2 > 7163 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 0 > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer - Shutting > down KafkaServer > 7176 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 1 > 7189 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 2 > 7203 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 3 > 7394 [kafka-request-handler-5] WARN state.change.logger - Broker > 1946108683 received update metadata request with correlation id 7 from an > old controller 178709090 with epoch 2. Latest known controller epoch is 3 > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis - > [KafkaApi-1946108683] error when handling request > > Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0] > -> > > (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026 > kafka.common.ControllerMovedException: Broker 1946108683 received update > metadata request with correlation id 7 from an old controller 178709090 > with epoch 2. Latest known controller epoch is 3 > at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114) > at kafka.server.KafkaApis.handle(KafkaApis.scala:72) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > at java.lang.Thread.run(Thread.java:724) > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN > kafka.controller.RequestSendThread - > [Controller-178709090-to-broker-178709090-send-thread], Controller > 178709090 fails to send a request to broker 178709090 > java.nio.channels.AsynchronousCloseException > at > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387) > at kafka.utils.Utils$.read(Utils.scala:394) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer - Shut down > complete for KafkaServer > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler - Failed to > send producer request with correlation id 810 to broker 178709090 with data > for partitions [test-topic,0] > java.net.SocketTimeoutException > at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > at kafka.utils.Utils$.read(Utils.scala:394) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > at > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > at > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) > at > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > at > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.producer.SyncProducer.send(SyncProducer.scala:100) > at > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > at > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > at java.lang.Thread.run(Thread.java:724) > 17319 [Thread-1] ERROR kafka.producer.SyncProducer - Producer connection > to localhost:1026 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > at > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > at java.lang.Thread.run(Thread.java:724) > 17322 [Thread-1] WARN kafka.client.ClientUtils$ - Fetching topic metadata > with correlation id 811 for topics [Set(test-topic)] from broker > [id:1,host:localhost,port:1026] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > at > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > at java.lang.Thread.run(Thread.java:724) > 17340 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 4 > 17353 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 5 > 17365 [Thread-1] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Sending message: test-stage: 4, message: 6 > > ... > ... > 23410 [Thread-28] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Received msg 'test-stage: 3, message: 98' > 23410 [Thread-28] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Received msg 'test-stage: 3, message: 99' > 23410 [Thread-28] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Received msg 'test-stage: 4, message: 0' > 23410 [Thread-28] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Received msg 'test-stage: 4, message: 1' > 23410 [Thread-28] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Received msg 'test-stage: 4, message: 3' > 23411 [Thread-28] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Received msg 'test-stage: 4, message: 4' > 23411 [Thread-28] INFO > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > Received msg 'test-stage: 4, message: 5' >