Guozhang, It turns out this is not entirely true, you do need request.required.acks = -1 (and yes, you need to retry if failure) in order have guaranteed delivery.
I discovered this, when doing tests with rolling restarts (and hard restarts) of the kafka servers. If the server goes down, e.g. if there's any change in the ISR leader for a partition, then the only way you can be sure what you produced will be available on a newly elected leader is to use -1. Jason On Sat, Oct 26, 2013 at 12:11 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Jason, > > Setting request.required.acks=-1 is orthogonal to the 'at least once' > guarantee, it only relates to the latency/replication trade-off. For > example, even if you set request.required.acks to 1, and as long as you > retry on all non-fatal exceptions you have the "at least once" guarantee; > and even if you set request.required.acks to -1 and you do not retry, you > will not get "at least once". > > As you said, setting request.required.acks=-1 only means that when a > success response has been returned you know that the message has been > received by all ISR replicas, but this has nothing to do with "at least > once" guarantee. > > Guozhang > > > On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <j...@squareup.com> wrote: > > > Just to clarify, I think in order to get 'at least once' guarantees, you > > must produce messages with 'request.required.acks=-1'. Otherwise, you > > can't be 100% sure the message was received by all ISR replicas. > > > > > > On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <kane.ist...@gmail.com> > wrote: > > > > > Thanks Guozhang, it makes sense if it's by design. Just wanted to > ensure > > > i'm not doing something wrong. > > > > > > > > > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > As we have said, the timeout exception does not actually mean the > > message > > > > is not committed to the broker. When message.send.max.retries is 0 > > Kafka > > > > does guarantee "at-most-once" which means that you will not have > > > > duplicates, but not means that all your exceptions can be treated as > > > > "message not delivered". In your case, 1480 - 1450 = 30 messages are > > the > > > > ones that are actually sent, not the ones that are duplicates. > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <kane.ist...@gmail.com> > > > wrote: > > > > > > > > > There are a lot of exceptions, I will try to pick an example of > each: > > > > > ERROR async.DefaultEventHandler - Failed to send requests for > topics > > > > > benchmark with correlation ids in [879,881] > > > > > WARN async.DefaultEventHandler - Produce request with correlation > id > > > 874 > > > > > failed due to [benchmark,43]: kafka.common.RequestTimedOutException > > > > > WARN client.ClientUtils$ - Fetching topic metadata with > correlation > > id > > > > 876 > > > > > for topics [Set(benchmark)] from broker > > > > [id:2,host:10.80.42.156,port:9092] > > > > > failed > > > > > ERROR producer.SyncProducer - Producer connection to > > > > > 10.80.42.156:9092unsuccessful > > > > > kafka.common.FailedToSendMessageException: Failed to send messages > > > after > > > > 0 > > > > > tries. > > > > > WARN async.DefaultEventHandler - Failed to send producer request > > with > > > > > correlation id 270 to broker 0 with data for partitions > > [benchmark,42] > > > > > > > > > > I think these are all types of exceptions i see there. > > > > > Thanks. > > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wangg...@gmail.com > > > > > > wrote: > > > > > > > > > > > Kane, > > > > > > > > > > > > If you set message.send.max.retries to 0 it should be > at-most-once, > > > > and I > > > > > > saw your props have the right config. What are the exceptions you > > got > > > > > from > > > > > > the send() call? > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin < > > st...@stevemorin.com> > > > > > > wrote: > > > > > > > > > > > > > Kane and Aniket, > > > > > > > I am interested in knowing what the pattern/solution that > > people > > > > > > usually > > > > > > > use to implement exactly once as well. > > > > > > > -Steve > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane < > > kane.ist...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Guozhang, but i've posted a piece from kafka documentation > > above: > > > > > > > > So effectively Kafka guarantees at-least-once delivery by > > default > > > > and > > > > > > > > allows the user to implement at most once delivery by > disabling > > > > > retries > > > > > > > on > > > > > > > > the producer. > > > > > > > > > > > > > > > > What i want is at-most-once and docs claim it's possible with > > > > certain > > > > > > > > settings. Did i miss anything here? > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang < > > > > wangg...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Aniket is exactly right. In general, Kafka provides "at > least > > > > once" > > > > > > > > > guarantee instead of "exactly once". > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar < > > > > > > > > > aniket.bhatna...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > As per my understanding, if the broker says the msg is > > > > committed, > > > > > > > its > > > > > > > > > > guaranteed to have been committed as per ur ack config. > If > > it > > > > > says > > > > > > it > > > > > > > > did > > > > > > > > > > not get committed, then its very hard to figure out if > this > > > was > > > > > > just > > > > > > > a > > > > > > > > > > false error. Since there is concept of unique ids for > > > > messages, a > > > > > > > > replay > > > > > > > > > of > > > > > > > > > > the same message will result in duplication. I think its > a > > > > > > reasonable > > > > > > > > > > behaviour considering kafka prefers to append data to > > > > partitions > > > > > > fot > > > > > > > > > > performance reasons. > > > > > > > > > > The best way to right now deal with duplicate msgs is to > > > build > > > > > the > > > > > > > > > > processing engine (layer where your consumer sits) to > deal > > > with > > > > > at > > > > > > > > least > > > > > > > > > > once semantics of the broker. > > > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <kane.ist...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Or, to rephrase it more generally, is there a way to > know > > > > > exactly > > > > > > > if > > > > > > > > > > > message was committed or no? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane < > > > > > > kane.ist...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hello Guozhang, > > > > > > > > > > > > > > > > > > > > > > > > My partitions are split almost evenly between broker, > > so, > > > > > yes - > > > > > > > > > broker > > > > > > > > > > > > that I shutdown is the leader for some of them. Does > it > > > > mean > > > > > i > > > > > > > can > > > > > > > > > get > > > > > > > > > > an > > > > > > > > > > > > exception and data is still being written? Is there > any > > > > > setting > > > > > > > on > > > > > > > > > the > > > > > > > > > > > > broker where i can control this? I.e. can i make > broker > > > > > > > replication > > > > > > > > > > > timeout > > > > > > > > > > > > shorter than producer timeout, so i can ensure if i > get > > > an > > > > > > > > exception > > > > > > > > > > data > > > > > > > > > > > > is not being committed? > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang < > > > > > > > > wangg...@gmail.com > > > > > > > > > > > >wrote: > > > > > > > > > > > > > > > > > > > > > > > >> Hello Kane, > > > > > > > > > > > >> > > > > > > > > > > > >> As discussed in the other thread, even if a timeout > > > > response > > > > > > is > > > > > > > > sent > > > > > > > > > > > back > > > > > > > > > > > >> to the producer, the message may still be committed. > > > > > > > > > > > >> > > > > > > > > > > > >> Did you shut down the leader broker of the partition > > or > > > a > > > > > > > follower > > > > > > > > > > > broker? > > > > > > > > > > > >> > > > > > > > > > > > >> Guozhang > > > > > > > > > > > >> > > > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane < > > > > > > > kane.ist...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the > > following > > > > > > script I > > > > > > > > > send > > > > > > > > > > > some > > > > > > > > > > > >> > data to kafka and in the middle do the controlled > > > > shutdown > > > > > > of > > > > > > > 1 > > > > > > > > > > > broker. > > > > > > > > > > > >> All > > > > > > > > > > > >> > 3 brokers are ISR before I start sending. When i > > > > shutdown > > > > > > the > > > > > > > > > > broker i > > > > > > > > > > > >> get > > > > > > > > > > > >> > a couple of exceptions and I expect data shouldn't > > be > > > > > > written. > > > > > > > > > Say, > > > > > > > > > > I > > > > > > > > > > > >> send > > > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to > > consume > > > > 1450 > > > > > > > > lines, > > > > > > > > > > but > > > > > > > > > > > >> > instead i always consume more, i.e. 1480 or 1490. > I > > > want > > > > > to > > > > > > > > decide > > > > > > > > > > if > > > > > > > > > > > I > > > > > > > > > > > >> > want to retry sending myself, not using > > > > > > > > message.send.max.retries. > > > > > > > > > > But > > > > > > > > > > > >> looks > > > > > > > > > > > >> > like if I retry sending if there is an exception > - I > > > > will > > > > > > end > > > > > > > up > > > > > > > > > > with > > > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong or > > > having > > > > > > wrong > > > > > > > > > > > >> assumptions > > > > > > > > > > > >> > about kafka? > > > > > > > > > > > >> > > > > > > > > > > > > >> > Thanks. > > > > > > > > > > > >> > > > > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092, > > > > > > > 10.80.42.154:9092, > > > > > > > > > > > >> > 10.80.42.156:9092") > > > > > > > > > > > >> > var count = 0 > > > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){ > > > > > > > > > > > >> > try { > > > > > > > > > > > >> > prod.send("benchmark", buffer.toList) > > > > > > > > > > > >> > count += 1 > > > > > > > > > > > >> > println("sent %s", count) > > > > > > > > > > > >> > } catch { > > > > > > > > > > > >> > case _ => println("Exception!") > > > > > > > > > > > >> > } > > > > > > > > > > > >> > } > > > > > > > > > > > >> > > > > > > > > > > > > >> > class MyProducer(brokerList: String) { > > > > > > > > > > > >> > val sync = true > > > > > > > > > > > >> > val requestRequiredAcks = "-1" > > > > > > > > > > > >> > > > > > > > > > > > > >> > val props = new Properties() > > > > > > > > > > > >> > props.put("metadata.broker.list", brokerList) > > > > > > > > > > > >> > props.put("producer.type", if(sync) "sync" else > > > > "async") > > > > > > > > > > > >> > props.put("request.required.acks", > > > > requestRequiredAcks) > > > > > > > > > > > >> > props.put("key.serializer.class", > > > > > > > > > classOf[StringEncoder].getName) > > > > > > > > > > > >> > props.put("serializer.class", > > > > > > > classOf[StringEncoder].getName) > > > > > > > > > > > >> > props.put("message.send.max.retries", "0") > > > > > > > > > > > >> > props.put("request.timeout.ms", "2000") > > > > > > > > > > > >> > > > > > > > > > > > > >> > val producer = new Producer[AnyRef, AnyRef](new > > > > > > > > > > > ProducerConfig(props)) > > > > > > > > > > > >> > > > > > > > > > > > > >> > def send(topic: String, messages: List[String]) > = > > { > > > > > > > > > > > >> > val requests = new > > > ArrayBuffer[KeyedMessage[AnyRef, > > > > > > > AnyRef]] > > > > > > > > > > > >> > for (message <- messages) { > > > > > > > > > > > >> > requests += new KeyedMessage(topic, null, > > > message, > > > > > > > > message) > > > > > > > > > > > >> > } > > > > > > > > > > > >> > producer.send(requests) > > > > > > > > > > > >> > } > > > > > > > > > > > >> > } > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> -- > > > > > > > > > > > >> -- Guozhang > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > -- Guozhang >