Shouldn't this be part of the contract? It should be able to make sure this happens before shutting down, no?
The leader writes messages to its local log and then the replicas consume messages from the leader and write those to their local logs. If you set request.required.acks=1, the ack is sent to the producer only after the leader has written messages to its local log. What you are asking for, is part of the contract if request.required.acks=-1. In this case, if we need to use required.request.acks=-1, that will pretty much prevent any successful message producing while any of the brokers for a partition is unavailable. So, I don't think that's an option. (Not to mention the performance degradation). You can implement reliable delivery semantics while allowing rolling restart of brokers by setting request.required.acks=-1. When one of the replicas is shut down, the ISR reduces to remove the replica being shut down and the messages will be committed using the new ISR. Thanks, Neha On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg <j...@squareup.com> wrote: > Neha, > > I'm not sure I understand. I would have thought that if the leader > acknowledges receipt of a message, and is then shut down cleanly (with > controlled shutdown enabled), that it would be able to reliably persist any > in memory buffered messages (and replicate them), before shutting down. > Shouldn't this be part of the contract? It should be able to make sure > this happens before shutting down, no? > > I would understand a message dropped if it were a hard shutdown. > > I'm not sure then how to implement reliable delivery semantics, while > allowing a rolling restart of the broker cluster (or even to tolerate a > single node failure, where one node might be down for awhile and need to be > replaced or have a disk repaired). In this case, if we need to use > required.request.acks=-1, that will pretty much prevent any successful > message producing while any of the brokers for a partition is unavailable. > So, I don't think that's an option. (Not to mention the performance > degradation). > > Is there not a way to make this work more reliably with leader only > acknowledgment, and clean/controlled shutdown? > > My test does succeed, as expected, with acks = -1, at least for the 100 or > so iterations I've let it run so far. It does on occasion send duplicates > (but that's ok with me). > > Jason > > > On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > The occasional single message loss could happen since > > required.request.acks=1 and the leader is shut down before the follower > > gets a chance to copy the message. Can you try your test with num acks > set > > to -1 ? > > > > Thanks, > > Neha > > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <j...@squareup.com> wrote: > > > > > All, > > > > > > I'm having an issue with an integration test I've setup. This is using > > > 0.8-beta1. > > > > > > The test is to verify that no messages are dropped (or the sender gets > an > > > exception thrown back if failure), while doing a rolling restart of a > > > cluster of 2 brokers. > > > > > > The producer is configured to use 'request.required.acks' = '1'. > > > > > > The servers are set up to run locally on localhost, on different ports, > > and > > > different data dirs. The producer connects with a metadata brokerlist > > > like: "localhost:2024,localhost:1025" (so no vip). The servers are > set > > > up with a default replication factor of 2. The servers have controlled > > > shutdown enabled, as well. > > > > > > The producer code looks like this: > > > ... > > > Producer<Integer, T> producer = getProducer(); > > > try { > > > KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, > T>(topic, > > > message); > > > producer.send(msg); > > > return true; > > > } catch (RuntimeException e) { > > > logger.warn("Error sending data to kafka", e); > > > return false; > > > } > > > ... > > > > > > The test sends groups of messages at each stage of the test (e.g. > servers > > > up, first server going down, first server coming up, second server > going > > > down, etc.). Then a consumer connects and consumes all the messages, > to > > > make sure they all arrived ok. > > > > > > It seems intermittently, a single message gets dropped, right after one > > of > > > the servers starts going down. It doesn't happen always, seems to > > happen 1 > > > out of every 20 test runs or so. Here's some sample output. I see the > > > exception inside the producer code, but I don't see the producer.send > > > method ever having an exception thrown back out to the caller (the log > > line > > > "Error sending data to kafka" is never triggered). > > > > > > What's interesting, is that it looks like the exceptions are happening > on > > > message 3, but when the consumer subsequently consumes back all the > > > messages in the broker cluster, it seems message 2 (and not message 3) > is > > > missing: > > > > > > ... > > > ... > > > 7136 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 3, message: 98 > > > 7150 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 3, message: 99 > > > 7163 [Thread-2] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Shutting down server2 > > > 7163 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 4, message: 0 > > > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer - > > Shutting > > > down KafkaServer > > > 7176 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 4, message: 1 > > > 7189 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 4, message: 2 > > > 7203 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 4, message: 3 > > > 7394 [kafka-request-handler-5] WARN state.change.logger - Broker > > > 1946108683 received update metadata request with correlation id 7 from > an > > > old controller 178709090 with epoch 2. Latest known controller epoch > is 3 > > > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis - > > > [KafkaApi-1946108683] error when handling request > > > > > > > > > Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0] > > > -> > > > > > > > > > (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026 > > > kafka.common.ControllerMovedException: Broker 1946108683 received > update > > > metadata request with correlation id 7 from an old controller 178709090 > > > with epoch 2. Latest known controller epoch is 3 > > > at > > kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114) > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:72) > > > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) > > > at java.lang.Thread.run(Thread.java:724) > > > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN > > > kafka.controller.RequestSendThread - > > > [Controller-178709090-to-broker-178709090-send-thread], Controller > > > 178709090 fails to send a request to broker 178709090 > > > java.nio.channels.AsynchronousCloseException > > > at > > > > > > > > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) > > > at > > > > > > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387) > > > at kafka.utils.Utils$.read(Utils.scala:394) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > at > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127) > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer - Shut > > down > > > complete for KafkaServer > > > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler - > Failed > > to > > > send producer request with correlation id 810 to broker 178709090 with > > data > > > for partitions [test-topic,0] > > > java.net.SocketTimeoutException > > > at > > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226) > > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > > > at > > > > > > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > > > at kafka.utils.Utils$.read(Utils.scala:394) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > at > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > > > at > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > > > at > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) > > > at > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > > > at > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > at > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) > > > at > > > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > > > at > > > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:100) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101) > > > at > > > > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > > at > > > > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > at > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > > > at > > > > > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > at kafka.producer.Producer.send(Producer.scala:74) > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > at > > > > > > > > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > > > at > > > > > > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > > > at java.lang.Thread.run(Thread.java:724) > > > 17319 [Thread-1] ERROR kafka.producer.SyncProducer - Producer > connection > > > to localhost:1026 unsuccessful > > > java.net.ConnectException: Connection refused > > > at sun.nio.ch.Net.connect0(Native Method) > > > at sun.nio.ch.Net.connect(Net.java:465) > > > at sun.nio.ch.Net.connect(Net.java:457) > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > at > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > at > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > at > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > at kafka.producer.Producer.send(Producer.scala:74) > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > at > > > > > > > > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > > > at > > > > > > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > > > at java.lang.Thread.run(Thread.java:724) > > > 17322 [Thread-1] WARN kafka.client.ClientUtils$ - Fetching topic > > metadata > > > with correlation id 811 for topics [Set(test-topic)] from broker > > > [id:1,host:localhost,port:1026] failed > > > java.net.ConnectException: Connection refused > > > at sun.nio.ch.Net.connect0(Native Method) > > > at sun.nio.ch.Net.connect(Net.java:465) > > > at sun.nio.ch.Net.connect(Net.java:457) > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > at > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > at > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > at > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > at kafka.producer.Producer.send(Producer.scala:74) > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > at > > > > > > > > > com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138) > > > at > > > > > > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845) > > > at java.lang.Thread.run(Thread.java:724) > > > 17340 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 4, message: 4 > > > 17353 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 4, message: 5 > > > 17365 [Thread-1] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Sending message: test-stage: 4, message: 6 > > > > > > ... > > > ... > > > 23410 [Thread-28] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Received msg 'test-stage: 3, message: 98' > > > 23410 [Thread-28] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Received msg 'test-stage: 3, message: 99' > > > 23410 [Thread-28] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Received msg 'test-stage: 4, message: 0' > > > 23410 [Thread-28] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Received msg 'test-stage: 4, message: 1' > > > 23410 [Thread-28] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Received msg 'test-stage: 4, message: 3' > > > 23411 [Thread-28] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Received msg 'test-stage: 4, message: 4' > > > 23411 [Thread-28] INFO > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest - > > > Received msg 'test-stage: 4, message: 5' > > > > > >