The occasional single message loss could happen since
required.request.acks=1 and the leader is shut down before the follower
gets a chance to copy the message. Can you try your test with num acks set
to -1 ?

Thanks,
Neha
On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <j...@squareup.com> wrote:

> All,
>
> I'm having an issue with an integration test I've setup.  This is using
> 0.8-beta1.
>
> The test is to verify that no messages are dropped (or the sender gets an
> exception thrown back if failure), while doing a rolling restart of a
> cluster of 2 brokers.
>
> The producer is configured to use 'request.required.acks' = '1'.
>
> The servers are set up to run locally on localhost, on different ports, and
> different data dirs.  The producer connects with a metadata brokerlist
> like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
> up with a default replication factor of 2.  The servers have controlled
> shutdown enabled, as well.
>
> The producer code looks like this:
>     ...
>     Producer<Integer, T> producer = getProducer();
>     try {
>       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic,
> message);
>       producer.send(msg);
>       return true;
>     } catch (RuntimeException e) {
>       logger.warn("Error sending data to kafka", e);
>       return false;
>     }
>     ...
>
> The test sends groups of messages at each stage of the test (e.g. servers
> up, first server going down, first server coming up, second server going
> down, etc.).  Then a consumer connects and consumes all the messages, to
> make sure they all arrived ok.
>
> It seems intermittently, a single message gets dropped, right after one of
> the servers starts going down.  It doesn't happen always, seems to happen 1
> out of every 20 test runs or so.  Here's some sample output.  I see the
> exception inside the producer code, but I don't see the producer.send
> method ever having an exception thrown back out to the caller (the log line
> "Error sending data to kafka" is never triggered).
>
> What's interesting, is that it looks like the exceptions are happening on
> message 3, but when the consumer subsequently consumes back all the
> messages in the broker cluster, it seems message 2 (and not message 3) is
> missing:
>
> ...
> ...
> 7136 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 3, message: 98
> 7150 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 3, message: 99
> 7163 [Thread-2] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Shutting down server2
> 7163 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 0
> 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  - Shutting
> down KafkaServer
> 7176 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 1
> 7189 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 2
> 7203 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 3
> 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> 1946108683 received update metadata request with correlation id 7 from an
> old controller 178709090 with epoch 2. Latest known controller epoch is 3
> 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> [KafkaApi-1946108683] error when handling request
>
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> ->
>
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> kafka.common.ControllerMovedException: Broker 1946108683 received update
> metadata request with correlation id 7 from an old controller 178709090
> with epoch 2. Latest known controller epoch is 3
> at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:724)
> 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> kafka.controller.RequestSendThread  -
> [Controller-178709090-to-broker-178709090-send-thread], Controller
> 178709090 fails to send a request to broker 178709090
> java.nio.channels.AsynchronousCloseException
> at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> at kafka.utils.Utils$.read(Utils.scala:394)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  - Shut down
> complete for KafkaServer
> 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  - Failed to
> send producer request with correlation id 810 to broker 178709090 with data
> for partitions [test-topic,0]
> java.net.SocketTimeoutException
> at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> at kafka.utils.Utils$.read(Utils.scala:394)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> at
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> at
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> at kafka.producer.Producer.send(Producer.scala:74)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at
>
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> at
>
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> at java.lang.Thread.run(Thread.java:724)
> 17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer connection
> to localhost:1026 unsuccessful
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at kafka.producer.Producer.send(Producer.scala:74)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at
>
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> at
>
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> at java.lang.Thread.run(Thread.java:724)
> 17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with correlation id 811 for topics [Set(test-topic)] from broker
> [id:1,host:localhost,port:1026] failed
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at kafka.producer.Producer.send(Producer.scala:74)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at
>
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> at
>
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> at java.lang.Thread.run(Thread.java:724)
> 17340 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 4
> 17353 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 5
> 17365 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 6
>
> ...
> ...
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 3, message: 98'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 3, message: 99'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 0'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 1'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 3'
> 23411 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 4'
> 23411 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 5'
>

Reply via email to