Hello Jason,

You are right. I think we just have different definitions about "at least
once". What you have described to me is more related to "availability",
which says that your message will not be lost when there are failures. And
we achieve this through replication (which is related to
request.required.acks). In 0.7, we do not have any replications and hence
when a broker goes down all the partitions it holds will be non-available,
but we still defined Kafka as a "at least once" messaging system.

Guozhang


On Sat, Oct 26, 2013 at 9:32 AM, Jason Rosenberg <j...@squareup.com> wrote:

> Guozhang,
>
> It turns out this is not entirely true, you do need request.required.acks =
> -1 (and yes, you need to retry if failure) in order have guaranteed
> delivery.
>
> I discovered this, when doing tests with rolling restarts (and hard
> restarts) of the kafka servers.  If the server goes down, e.g. if there's
> any change in the ISR leader for a partition, then the only way you can be
> sure what you produced will be available on a newly elected leader is to
> use -1.
>
> Jason
>
>
>
>
> On Sat, Oct 26, 2013 at 12:11 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Jason,
> >
> > Setting request.required.acks=-1 is orthogonal to the 'at least once'
> > guarantee, it only relates to the latency/replication trade-off. For
> > example, even if you set request.required.acks to 1, and as long as you
> > retry on all non-fatal exceptions you have the "at least once" guarantee;
> > and even if you set request.required.acks to -1 and you do not retry, you
> > will not get "at least once".
> >
> > As you said, setting request.required.acks=-1 only means that when a
> > success response has been returned you know that the message has been
> > received by all ISR replicas, but this has nothing to do with "at least
> > once" guarantee.
> >
> > Guozhang
> >
> >
> > On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <j...@squareup.com>
> wrote:
> >
> > > Just to clarify, I think in order to get 'at least once' guarantees,
> you
> > > must produce messages with 'request.required.acks=-1'.  Otherwise, you
> > > can't be 100% sure the message was received by all ISR replicas.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <kane.ist...@gmail.com>
> > wrote:
> > >
> > > > Thanks Guozhang, it makes sense if it's by design. Just wanted to
> > ensure
> > > > i'm not doing something wrong.
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > As we have said, the timeout exception does not actually mean the
> > > message
> > > > > is not committed to the broker. When message.send.max.retries is 0
> > > Kafka
> > > > > does guarantee "at-most-once" which means that you will not have
> > > > > duplicates, but not means that all your exceptions can be treated
> as
> > > > > "message not delivered". In your case, 1480 - 1450 = 30 messages
> are
> > > the
> > > > > ones that are actually sent, not the ones that are duplicates.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <kane.ist...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > There are a lot of exceptions, I will try to pick an example of
> > each:
> > > > > > ERROR async.DefaultEventHandler - Failed to send requests for
> > topics
> > > > > > benchmark with correlation ids in [879,881]
> > > > > > WARN  async.DefaultEventHandler - Produce request with
> correlation
> > id
> > > > 874
> > > > > > failed due to [benchmark,43]:
> kafka.common.RequestTimedOutException
> > > > > > WARN  client.ClientUtils$ - Fetching topic metadata with
> > correlation
> > > id
> > > > > 876
> > > > > > for topics [Set(benchmark)] from broker
> > > > > [id:2,host:10.80.42.156,port:9092]
> > > > > > failed
> > > > > > ERROR producer.SyncProducer - Producer connection to
> > > > > > 10.80.42.156:9092unsuccessful
> > > > > > kafka.common.FailedToSendMessageException: Failed to send
> messages
> > > > after
> > > > > 0
> > > > > > tries.
> > > > > > WARN  async.DefaultEventHandler - Failed to send producer request
> > > with
> > > > > > correlation id 270 to broker 0 with data for partitions
> > > [benchmark,42]
> > > > > >
> > > > > > I think these are all types of exceptions i see there.
> > > > > > Thanks.
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Kane,
> > > > > > >
> > > > > > > If you set message.send.max.retries to 0 it should be
> > at-most-once,
> > > > > and I
> > > > > > > saw your props have the right config. What are the exceptions
> you
> > > got
> > > > > > from
> > > > > > > the send() call?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> > > st...@stevemorin.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Kane and Aniket,
> > > > > > > >   I am interested in knowing what the pattern/solution that
> > > people
> > > > > > > usually
> > > > > > > > use to implement exactly once as well.
> > > > > > > > -Steve
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> > > kane.ist...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Guozhang, but i've posted a piece from kafka documentation
> > > above:
> > > > > > > > > So effectively Kafka guarantees at-least-once delivery by
> > > default
> > > > > and
> > > > > > > > > allows the user to implement at most once delivery by
> > disabling
> > > > > > retries
> > > > > > > > on
> > > > > > > > > the producer.
> > > > > > > > >
> > > > > > > > > What i want is at-most-once and docs claim it's possible
> with
> > > > > certain
> > > > > > > > > settings. Did i miss anything here?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > > > wangg...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Aniket is exactly right. In general, Kafka provides "at
> > least
> > > > > once"
> > > > > > > > > > guarantee instead of "exactly once".
> > > > > > > > > >
> > > > > > > > > > Guozhang
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > > > aniket.bhatna...@gmail.com> wrote:
> > > > > > > > > >
> > > > > > > > > > > As per my understanding, if the broker says the msg is
> > > > > committed,
> > > > > > > >  its
> > > > > > > > > > > guaranteed to have been committed as per ur ack config.
> > If
> > > it
> > > > > > says
> > > > > > > it
> > > > > > > > > did
> > > > > > > > > > > not get committed, then its very hard to figure out if
> > this
> > > > was
> > > > > > > just
> > > > > > > > a
> > > > > > > > > > > false error. Since there is concept of unique ids for
> > > > > messages, a
> > > > > > > > > replay
> > > > > > > > > > of
> > > > > > > > > > > the same message will result in duplication. I think
> its
> > a
> > > > > > > reasonable
> > > > > > > > > > > behaviour considering kafka prefers to append data to
> > > > > partitions
> > > > > > > fot
> > > > > > > > > > > performance reasons.
> > > > > > > > > > > The best way to right now deal with duplicate msgs is
> to
> > > > build
> > > > > > the
> > > > > > > > > > > processing engine (layer where your consumer sits) to
> > deal
> > > > with
> > > > > > at
> > > > > > > > > least
> > > > > > > > > > > once semantics of the broker.
> > > > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <
> kane.ist...@gmail.com
> > >
> > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Or, to rephrase it more generally, is there a way to
> > know
> > > > > > exactly
> > > > > > > > if
> > > > > > > > > > > > message was committed or no?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > > > kane.ist...@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > > > >
> > > > > > > > > > > > > My partitions are split almost evenly between
> broker,
> > > so,
> > > > > > yes -
> > > > > > > > > > broker
> > > > > > > > > > > > > that I shutdown is the leader for some of them.
> Does
> > it
> > > > > mean
> > > > > > i
> > > > > > > > can
> > > > > > > > > > get
> > > > > > > > > > > an
> > > > > > > > > > > > > exception and data is still being written? Is there
> > any
> > > > > > setting
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > > > broker where i can control this? I.e. can i make
> > broker
> > > > > > > > replication
> > > > > > > > > > > > timeout
> > > > > > > > > > > > > shorter than producer timeout, so i can ensure if i
> > get
> > > > an
> > > > > > > > > exception
> > > > > > > > > > > data
> > > > > > > > > > > > > is not being committed?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > > > > wangg...@gmail.com
> > > > > > > > > > > > >wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hello Kane,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> As discussed in the other thread, even if a
> timeout
> > > > > response
> > > > > > > is
> > > > > > > > > sent
> > > > > > > > > > > > back
> > > > > > > > > > > > >> to the producer, the message may still be
> committed.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Did you shut down the leader broker of the
> partition
> > > or
> > > > a
> > > > > > > > follower
> > > > > > > > > > > > broker?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Guozhang
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > > > kane.ist...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> > > following
> > > > > > > script I
> > > > > > > > > > send
> > > > > > > > > > > > some
> > > > > > > > > > > > >> > data to kafka and in the middle do the
> controlled
> > > > > shutdown
> > > > > > > of
> > > > > > > > 1
> > > > > > > > > > > > broker.
> > > > > > > > > > > > >> All
> > > > > > > > > > > > >> > 3 brokers are ISR before I start sending. When i
> > > > > shutdown
> > > > > > > the
> > > > > > > > > > > broker i
> > > > > > > > > > > > >> get
> > > > > > > > > > > > >> > a couple of exceptions and I expect data
> shouldn't
> > > be
> > > > > > > written.
> > > > > > > > > > Say,
> > > > > > > > > > > I
> > > > > > > > > > > > >> send
> > > > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to
> > > consume
> > > > > 1450
> > > > > > > > > lines,
> > > > > > > > > > > but
> > > > > > > > > > > > >> > instead i always consume more, i.e. 1480 or
> 1490.
> > I
> > > > want
> > > > > > to
> > > > > > > > > decide
> > > > > > > > > > > if
> > > > > > > > > > > > I
> > > > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > > > message.send.max.retries.
> > > > > > > > > > > But
> > > > > > > > > > > > >> looks
> > > > > > > > > > > > >> > like if I retry sending if there is an exception
> > - I
> > > > > will
> > > > > > > end
> > > > > > > > up
> > > > > > > > > > > with
> > > > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong or
> > > > having
> > > > > > > wrong
> > > > > > > > > > > > >> assumptions
> > > > > > > > > > > > >> > about kafka?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Thanks.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > > > > 10.80.42.154:9092,
> > > > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > > > >> > var count = 0
> > > > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > > > > >> >     try {
> > > > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > > > >> >       count += 1
> > > > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > > > >> >     } catch {
> > > > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > > > >> >     }
> > > > > > > > > > > > >> > }
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > > > >> >   val sync = true
> > > > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync"
> else
> > > > > "async")
> > > > > > > > > > > > >> >   props.put("request.required.acks",
> > > > > requestRequiredAcks)
> > > > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >   val producer = new Producer[AnyRef,
> AnyRef](new
> > > > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >   def send(topic: String, messages:
> List[String])
> > =
> > > {
> > > > > > > > > > > > >> >     val requests = new
> > > > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > > > AnyRef]]
> > > > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > > > >> >       requests += new KeyedMessage(topic, null,
> > > > message,
> > > > > > > > > message)
> > > > > > > > > > > > >> >     }
> > > > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > > > >> >   }
> > > > > > > > > > > > >> > }
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> --
> > > > > > > > > > > > >> -- Guozhang
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to