Jason,

Setting request.required.acks=-1 is orthogonal to the 'at least once'
guarantee, it only relates to the latency/replication trade-off. For
example, even if you set request.required.acks to 1, and as long as you
retry on all non-fatal exceptions you have the "at least once" guarantee;
and even if you set request.required.acks to -1 and you do not retry, you
will not get "at least once".

As you said, setting request.required.acks=-1 only means that when a
success response has been returned you know that the message has been
received by all ISR replicas, but this has nothing to do with "at least
once" guarantee.

Guozhang


On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <j...@squareup.com> wrote:

> Just to clarify, I think in order to get 'at least once' guarantees, you
> must produce messages with 'request.required.acks=-1'.  Otherwise, you
> can't be 100% sure the message was received by all ISR replicas.
>
>
> On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <kane.ist...@gmail.com> wrote:
>
> > Thanks Guozhang, it makes sense if it's by design. Just wanted to ensure
> > i'm not doing something wrong.
> >
> >
> > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > As we have said, the timeout exception does not actually mean the
> message
> > > is not committed to the broker. When message.send.max.retries is 0
> Kafka
> > > does guarantee "at-most-once" which means that you will not have
> > > duplicates, but not means that all your exceptions can be treated as
> > > "message not delivered". In your case, 1480 - 1450 = 30 messages are
> the
> > > ones that are actually sent, not the ones that are duplicates.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <kane.ist...@gmail.com>
> > wrote:
> > >
> > > > There are a lot of exceptions, I will try to pick an example of each:
> > > > ERROR async.DefaultEventHandler - Failed to send requests for topics
> > > > benchmark with correlation ids in [879,881]
> > > > WARN  async.DefaultEventHandler - Produce request with correlation id
> > 874
> > > > failed due to [benchmark,43]: kafka.common.RequestTimedOutException
> > > > WARN  client.ClientUtils$ - Fetching topic metadata with correlation
> id
> > > 876
> > > > for topics [Set(benchmark)] from broker
> > > [id:2,host:10.80.42.156,port:9092]
> > > > failed
> > > > ERROR producer.SyncProducer - Producer connection to
> > > > 10.80.42.156:9092unsuccessful
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 0
> > > > tries.
> > > > WARN  async.DefaultEventHandler - Failed to send producer request
> with
> > > > correlation id 270 to broker 0 with data for partitions
> [benchmark,42]
> > > >
> > > > I think these are all types of exceptions i see there.
> > > > Thanks.
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Kane,
> > > > >
> > > > > If you set message.send.max.retries to 0 it should be at-most-once,
> > > and I
> > > > > saw your props have the right config. What are the exceptions you
> got
> > > > from
> > > > > the send() call?
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> st...@stevemorin.com>
> > > > > wrote:
> > > > >
> > > > > > Kane and Aniket,
> > > > > >   I am interested in knowing what the pattern/solution that
> people
> > > > > usually
> > > > > > use to implement exactly once as well.
> > > > > > -Steve
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> kane.ist...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Guozhang, but i've posted a piece from kafka documentation
> above:
> > > > > > > So effectively Kafka guarantees at-least-once delivery by
> default
> > > and
> > > > > > > allows the user to implement at most once delivery by disabling
> > > > retries
> > > > > > on
> > > > > > > the producer.
> > > > > > >
> > > > > > > What i want is at-most-once and docs claim it's possible with
> > > certain
> > > > > > > settings. Did i miss anything here?
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Aniket is exactly right. In general, Kafka provides "at least
> > > once"
> > > > > > > > guarantee instead of "exactly once".
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > aniket.bhatna...@gmail.com> wrote:
> > > > > > > >
> > > > > > > > > As per my understanding, if the broker says the msg is
> > > committed,
> > > > > >  its
> > > > > > > > > guaranteed to have been committed as per ur ack config. If
> it
> > > > says
> > > > > it
> > > > > > > did
> > > > > > > > > not get committed, then its very hard to figure out if this
> > was
> > > > > just
> > > > > > a
> > > > > > > > > false error. Since there is concept of unique ids for
> > > messages, a
> > > > > > > replay
> > > > > > > > of
> > > > > > > > > the same message will result in duplication. I think its a
> > > > > reasonable
> > > > > > > > > behaviour considering kafka prefers to append data to
> > > partitions
> > > > > fot
> > > > > > > > > performance reasons.
> > > > > > > > > The best way to right now deal with duplicate msgs is to
> > build
> > > > the
> > > > > > > > > processing engine (layer where your consumer sits) to deal
> > with
> > > > at
> > > > > > > least
> > > > > > > > > once semantics of the broker.
> > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <kane.ist...@gmail.com>
> > > wrote:
> > > > > > > > >
> > > > > > > > > > Or, to rephrase it more generally, is there a way to know
> > > > exactly
> > > > > > if
> > > > > > > > > > message was committed or no?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > kane.ist...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > >
> > > > > > > > > > > My partitions are split almost evenly between broker,
> so,
> > > > yes -
> > > > > > > > broker
> > > > > > > > > > > that I shutdown is the leader for some of them. Does it
> > > mean
> > > > i
> > > > > > can
> > > > > > > > get
> > > > > > > > > an
> > > > > > > > > > > exception and data is still being written? Is there any
> > > > setting
> > > > > > on
> > > > > > > > the
> > > > > > > > > > > broker where i can control this? I.e. can i make broker
> > > > > > replication
> > > > > > > > > > timeout
> > > > > > > > > > > shorter than producer timeout, so i can ensure if i get
> > an
> > > > > > > exception
> > > > > > > > > data
> > > > > > > > > > > is not being committed?
> > > > > > > > > > >
> > > > > > > > > > > Thanks.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > > wangg...@gmail.com
> > > > > > > > > > >wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hello Kane,
> > > > > > > > > > >>
> > > > > > > > > > >> As discussed in the other thread, even if a timeout
> > > response
> > > > > is
> > > > > > > sent
> > > > > > > > > > back
> > > > > > > > > > >> to the producer, the message may still be committed.
> > > > > > > > > > >>
> > > > > > > > > > >> Did you shut down the leader broker of the partition
> or
> > a
> > > > > > follower
> > > > > > > > > > broker?
> > > > > > > > > > >>
> > > > > > > > > > >> Guozhang
> > > > > > > > > > >>
> > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > kane.ist...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> following
> > > > > script I
> > > > > > > > send
> > > > > > > > > > some
> > > > > > > > > > >> > data to kafka and in the middle do the controlled
> > > shutdown
> > > > > of
> > > > > > 1
> > > > > > > > > > broker.
> > > > > > > > > > >> All
> > > > > > > > > > >> > 3 brokers are ISR before I start sending. When i
> > > shutdown
> > > > > the
> > > > > > > > > broker i
> > > > > > > > > > >> get
> > > > > > > > > > >> > a couple of exceptions and I expect data shouldn't
> be
> > > > > written.
> > > > > > > > Say,
> > > > > > > > > I
> > > > > > > > > > >> send
> > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to
> consume
> > > 1450
> > > > > > > lines,
> > > > > > > > > but
> > > > > > > > > > >> > instead i always consume more, i.e. 1480 or 1490. I
> > want
> > > > to
> > > > > > > decide
> > > > > > > > > if
> > > > > > > > > > I
> > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > message.send.max.retries.
> > > > > > > > > But
> > > > > > > > > > >> looks
> > > > > > > > > > >> > like if I retry sending if there is an exception - I
> > > will
> > > > > end
> > > > > > up
> > > > > > > > > with
> > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong or
> > having
> > > > > wrong
> > > > > > > > > > >> assumptions
> > > > > > > > > > >> > about kafka?
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks.
> > > > > > > > > > >> >
> > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > > 10.80.42.154:9092,
> > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > >> > var count = 0
> > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > > >> >     try {
> > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > >> >       count += 1
> > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > >> >     } catch {
> > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > >> >     }
> > > > > > > > > > >> > }
> > > > > > > > > > >> >
> > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > >> >   val sync = true
> > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > >> >
> > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync" else
> > > "async")
> > > > > > > > > > >> >   props.put("request.required.acks",
> > > requestRequiredAcks)
> > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > >> >
> > > > > > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > >> >
> > > > > > > > > > >> >   def send(topic: String, messages: List[String]) =
> {
> > > > > > > > > > >> >     val requests = new
> > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > AnyRef]]
> > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > >> >       requests += new KeyedMessage(topic, null,
> > message,
> > > > > > > message)
> > > > > > > > > > >> >     }
> > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > >> >   }
> > > > > > > > > > >> > }
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> --
> > > > > > > > > > >> -- Guozhang
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Reply via email to