Neha, I'm not sure I understand. I would have thought that if the leader acknowledges receipt of a message, and is then shut down cleanly (with controlled shutdown enabled), that it would be able to reliably persist any in memory buffered messages (and replicate them), before shutting down. Shouldn't this be part of the contract? It should be able to make sure this happens before shutting down, no?
I would understand a message dropped if it were a hard shutdown. I'm not sure then how to implement reliable delivery semantics, while allowing a rolling restart of the broker cluster (or even to tolerate a single node failure, where one node might be down for awhile and need to be replaced or have a disk repaired). In this case, if we need to use required.request.acks=-1, that will pretty much prevent any successful message producing while any of the brokers for a partition is unavailable. So, I don't think that's an option. (Not to mention the performance degradation). Is there not a way to make this work more reliably with leader only acknowledgment, and clean/controlled shutdown? My test does succeed, as expected, with acks = -1, at least for the 100 or so iterations I've let it run so far. It does on occasion send duplicates (but that's ok with me). Jason On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > The occasional single message loss could happen since > required.request.acks=1 and the leader is shut down before the follower > gets a chance to copy the message. Can you try your test with num acks set > to -1 ? > > Thanks, > Neha > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <j...@squareup.com> wrote: > > > All, > > > > I'm having an issue with an integration test I've setup. This is using > > 0.8-beta1. > > > > The test is to verify that no messages are dropped (or the sender gets an > > exception thrown back if failure), while doing a rolling restart of a > > cluster of 2 brokers. > > > > The producer is configured to use 'request.required.acks' = '1'. > > > > The servers are set up to run locally on localhost, on different ports, > and > > different data dirs. The producer connects with a metadata brokerlist > > like: "localhost:2024,localhost:1025" (so no vip). The servers are set > > up with a default replication factor of 2. The servers have controlled > > shutdown enabled, as well. > > > > The producer code looks like this: > > ... > > Producer<Integer, T> producer = getProducer(); > > try { > > KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic, > > message); > > producer.send(msg); > > return true; > > } catch (RuntimeException e) { > > logger.warn("Error sending data to kafka", e); > > return false; > > } > > ... > > > > The test sends groups of messages at each stage of the test (e.g. servers > > up, first server going down, first server coming up, second server going > > down, etc.). Then a consumer connects and consumes all the messages, to > > make sure they all arrived ok. > > > > It seems intermittently, a single message gets dropped, right after one > of > > the servers starts going down. It doesn't happen always, seems to > happen 1 > > out of every 20 test runs or so. Here's some sample output. I see the > > exception inside the producer code, but I don't see the producer.send > > method ever having an exception thrown back out to the caller (the log > line > > "Error sending data to kafka" is never triggered). > > > > What's interesting, is that it looks like the exceptions are happening on > > message 3, but when the consumer subsequently consumes back all the > > messages in the broker cluster, it seems message 2 (and not message 3) is > > missing: > > > > ... > > ... > > 7136 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 3, message: 98 > > 7150 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 3, message: 99 > > 7163 [Thread-2] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Shutting down server2 > > 7163 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 0 > > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer - > Shutting > > down KafkaServer > > 7176 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 1 > > 7189 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 2 > > 7203 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 3 > > 7394 [kafka-request-handler-5] WARN state.change.logger - Broker > > 1946108683 received update metadata request with correlation id 7 from an > > old controller 178709090 with epoch 2. Latest known controller epoch is 3 > > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis - > > [KafkaApi-1946108683] error when handling request > > > > > Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0] > > -> > > > > > (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026 > > kafka.common.ControllerMovedException: Broker 1946108683 received update > > metadata request with correlation id 7 from an old controller 178709090 > > with epoch 2. Latest known controller epoch is 3 > > at > kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114) > > at kafka.server.KafkaApis.handle(KafkaApis.scala:72) > > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > at java.lang.Thread.run(Thread.java:724) > > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN > > kafka.controller.RequestSendThread - > > [Controller-178709090-to-broker-178709090-send-thread], Controller > > 178709090 fails to send a request to broker 178709090 > > java.nio.channels.AsynchronousCloseException > > at > > > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) > > at > > > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387) > > at kafka.utils.Utils$.read(Utils.scala:394) > > at > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > at > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer - Shut > down > > complete for KafkaServer > > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler - Failed > to > > send producer request with correlation id 810 to broker 178709090 with > data > > for partitions [test-topic,0] > > java.net.SocketTimeoutException > > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226) > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > > at > > > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > > at kafka.utils.Utils$.read(Utils.scala:394) > > at > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > > at > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > > at > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) > > at > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > > at > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at > > > > > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) > > at > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > > at > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:100) > > at > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244) > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107) > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101) > > at > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > at > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > > at > > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > > at > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101) > > at > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > at kafka.producer.Producer.send(Producer.scala:74) > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > at > > > > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > > at > > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > > at java.lang.Thread.run(Thread.java:724) > > 17319 [Thread-1] ERROR kafka.producer.SyncProducer - Producer connection > > to localhost:1026 unsuccessful > > java.net.ConnectException: Connection refused > > at sun.nio.ch.Net.connect0(Native Method) > > at sun.nio.ch.Net.connect(Net.java:465) > > at sun.nio.ch.Net.connect(Net.java:457) > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > at > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > at kafka.producer.Producer.send(Producer.scala:74) > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > at > > > > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > > at > > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > > at java.lang.Thread.run(Thread.java:724) > > 17322 [Thread-1] WARN kafka.client.ClientUtils$ - Fetching topic > metadata > > with correlation id 811 for topics [Set(test-topic)] from broker > > [id:1,host:localhost,port:1026] failed > > java.net.ConnectException: Connection refused > > at sun.nio.ch.Net.connect0(Native Method) > > at sun.nio.ch.Net.connect(Net.java:465) > > at sun.nio.ch.Net.connect(Net.java:457) > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > at > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > at kafka.producer.Producer.send(Producer.scala:74) > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > at > > > > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > > at > > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > > at java.lang.Thread.run(Thread.java:724) > > 17340 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 4 > > 17353 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 5 > > 17365 [Thread-1] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Sending message: test-stage: 4, message: 6 > > > > ... > > ... > > 23410 [Thread-28] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Received msg 'test-stage: 3, message: 98' > > 23410 [Thread-28] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Received msg 'test-stage: 3, message: 99' > > 23410 [Thread-28] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Received msg 'test-stage: 4, message: 0' > > 23410 [Thread-28] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Received msg 'test-stage: 4, message: 1' > > 23410 [Thread-28] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Received msg 'test-stage: 4, message: 3' > > 23411 [Thread-28] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Received msg 'test-stage: 4, message: 4' > > 23411 [Thread-28] INFO > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > Received msg 'test-stage: 4, message: 5' > > >