Perhaps, it's a matter of semantics.

But, I think I'm not talking only about failure, but normal operation.
 It's normal to take a cluster down for maintenance, or code update.  And
this should be done a rolling restart manner (1 server at a time).

The reason for replication, is to increase reliability (as well as
availability).  We've said that in 0.8.0, we can tolerate R-1 node failures
(where R is the replication factor).  From the client's standpoint, this
should mean that the cluster is still available and reliable when a single
node is down (assuming R is > 1).

When you say "at least once", you are suggesting that the message will be
delivered at least once, and won't be lost.

I don't think that was ever really true about the previous version <= 0.7
(and I don't think I ever really read that about 0.7, as a guarantee, did
I?).

Jason


On Sat, Oct 26, 2013 at 8:52 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Jason,
>
> You are right. I think we just have different definitions about "at least
> once". What you have described to me is more related to "availability",
> which says that your message will not be lost when there are failures. And
> we achieve this through replication (which is related to
> request.required.acks). In 0.7, we do not have any replications and hence
> when a broker goes down all the partitions it holds will be non-available,
> but we still defined Kafka as a "at least once" messaging system.
>
> Guozhang
>
>
> On Sat, Oct 26, 2013 at 9:32 AM, Jason Rosenberg <j...@squareup.com> wrote:
>
> > Guozhang,
> >
> > It turns out this is not entirely true, you do need
> request.required.acks =
> > -1 (and yes, you need to retry if failure) in order have guaranteed
> > delivery.
> >
> > I discovered this, when doing tests with rolling restarts (and hard
> > restarts) of the kafka servers.  If the server goes down, e.g. if there's
> > any change in the ISR leader for a partition, then the only way you can
> be
> > sure what you produced will be available on a newly elected leader is to
> > use -1.
> >
> > Jason
> >
> >
> >
> >
> > On Sat, Oct 26, 2013 at 12:11 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Jason,
> > >
> > > Setting request.required.acks=-1 is orthogonal to the 'at least once'
> > > guarantee, it only relates to the latency/replication trade-off. For
> > > example, even if you set request.required.acks to 1, and as long as you
> > > retry on all non-fatal exceptions you have the "at least once"
> guarantee;
> > > and even if you set request.required.acks to -1 and you do not retry,
> you
> > > will not get "at least once".
> > >
> > > As you said, setting request.required.acks=-1 only means that when a
> > > success response has been returned you know that the message has been
> > > received by all ISR replicas, but this has nothing to do with "at least
> > > once" guarantee.
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Oct 25, 2013 at 8:55 PM, Jason Rosenberg <j...@squareup.com>
> > wrote:
> > >
> > > > Just to clarify, I think in order to get 'at least once' guarantees,
> > you
> > > > must produce messages with 'request.required.acks=-1'.  Otherwise,
> you
> > > > can't be 100% sure the message was received by all ISR replicas.
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 9:56 PM, Kane Kane <kane.ist...@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks Guozhang, it makes sense if it's by design. Just wanted to
> > > ensure
> > > > > i'm not doing something wrong.
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 5:57 PM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > As we have said, the timeout exception does not actually mean the
> > > > message
> > > > > > is not committed to the broker. When message.send.max.retries is
> 0
> > > > Kafka
> > > > > > does guarantee "at-most-once" which means that you will not have
> > > > > > duplicates, but not means that all your exceptions can be treated
> > as
> > > > > > "message not delivered". In your case, 1480 - 1450 = 30 messages
> > are
> > > > the
> > > > > > ones that are actually sent, not the ones that are duplicates.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <
> kane.ist...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > There are a lot of exceptions, I will try to pick an example of
> > > each:
> > > > > > > ERROR async.DefaultEventHandler - Failed to send requests for
> > > topics
> > > > > > > benchmark with correlation ids in [879,881]
> > > > > > > WARN  async.DefaultEventHandler - Produce request with
> > correlation
> > > id
> > > > > 874
> > > > > > > failed due to [benchmark,43]:
> > kafka.common.RequestTimedOutException
> > > > > > > WARN  client.ClientUtils$ - Fetching topic metadata with
> > > correlation
> > > > id
> > > > > > 876
> > > > > > > for topics [Set(benchmark)] from broker
> > > > > > [id:2,host:10.80.42.156,port:9092]
> > > > > > > failed
> > > > > > > ERROR producer.SyncProducer - Producer connection to
> > > > > > > 10.80.42.156:9092unsuccessful
> > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > messages
> > > > > after
> > > > > > 0
> > > > > > > tries.
> > > > > > > WARN  async.DefaultEventHandler - Failed to send producer
> request
> > > > with
> > > > > > > correlation id 270 to broker 0 with data for partitions
> > > > [benchmark,42]
> > > > > > >
> > > > > > > I think these are all types of exceptions i see there.
> > > > > > > Thanks.
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Kane,
> > > > > > > >
> > > > > > > > If you set message.send.max.retries to 0 it should be
> > > at-most-once,
> > > > > > and I
> > > > > > > > saw your props have the right config. What are the exceptions
> > you
> > > > got
> > > > > > > from
> > > > > > > > the send() call?
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <
> > > > st...@stevemorin.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Kane and Aniket,
> > > > > > > > >   I am interested in knowing what the pattern/solution that
> > > > people
> > > > > > > > usually
> > > > > > > > > use to implement exactly once as well.
> > > > > > > > > -Steve
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <
> > > > kane.ist...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Guozhang, but i've posted a piece from kafka
> documentation
> > > > above:
> > > > > > > > > > So effectively Kafka guarantees at-least-once delivery by
> > > > default
> > > > > > and
> > > > > > > > > > allows the user to implement at most once delivery by
> > > disabling
> > > > > > > retries
> > > > > > > > > on
> > > > > > > > > > the producer.
> > > > > > > > > >
> > > > > > > > > > What i want is at-most-once and docs claim it's possible
> > with
> > > > > > certain
> > > > > > > > > > settings. Did i miss anything here?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <
> > > > > > wangg...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Aniket is exactly right. In general, Kafka provides "at
> > > least
> > > > > > once"
> > > > > > > > > > > guarantee instead of "exactly once".
> > > > > > > > > > >
> > > > > > > > > > > Guozhang
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > > > > > > > aniket.bhatna...@gmail.com> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > As per my understanding, if the broker says the msg
> is
> > > > > > committed,
> > > > > > > > >  its
> > > > > > > > > > > > guaranteed to have been committed as per ur ack
> config.
> > > If
> > > > it
> > > > > > > says
> > > > > > > > it
> > > > > > > > > > did
> > > > > > > > > > > > not get committed, then its very hard to figure out
> if
> > > this
> > > > > was
> > > > > > > > just
> > > > > > > > > a
> > > > > > > > > > > > false error. Since there is concept of unique ids for
> > > > > > messages, a
> > > > > > > > > > replay
> > > > > > > > > > > of
> > > > > > > > > > > > the same message will result in duplication. I think
> > its
> > > a
> > > > > > > > reasonable
> > > > > > > > > > > > behaviour considering kafka prefers to append data to
> > > > > > partitions
> > > > > > > > fot
> > > > > > > > > > > > performance reasons.
> > > > > > > > > > > > The best way to right now deal with duplicate msgs is
> > to
> > > > > build
> > > > > > > the
> > > > > > > > > > > > processing engine (layer where your consumer sits) to
> > > deal
> > > > > with
> > > > > > > at
> > > > > > > > > > least
> > > > > > > > > > > > once semantics of the broker.
> > > > > > > > > > > > On 25 Oct 2013 23:23, "Kane Kane" <
> > kane.ist...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Or, to rephrase it more generally, is there a way
> to
> > > know
> > > > > > > exactly
> > > > > > > > > if
> > > > > > > > > > > > > message was committed or no?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > > > > > > > kane.ist...@gmail.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hello Guozhang,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > My partitions are split almost evenly between
> > broker,
> > > > so,
> > > > > > > yes -
> > > > > > > > > > > broker
> > > > > > > > > > > > > > that I shutdown is the leader for some of them.
> > Does
> > > it
> > > > > > mean
> > > > > > > i
> > > > > > > > > can
> > > > > > > > > > > get
> > > > > > > > > > > > an
> > > > > > > > > > > > > > exception and data is still being written? Is
> there
> > > any
> > > > > > > setting
> > > > > > > > > on
> > > > > > > > > > > the
> > > > > > > > > > > > > > broker where i can control this? I.e. can i make
> > > broker
> > > > > > > > > replication
> > > > > > > > > > > > > timeout
> > > > > > > > > > > > > > shorter than producer timeout, so i can ensure
> if i
> > > get
> > > > > an
> > > > > > > > > > exception
> > > > > > > > > > > > data
> > > > > > > > > > > > > > is not being committed?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > > > > > > > wangg...@gmail.com
> > > > > > > > > > > > > >wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> Hello Kane,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> As discussed in the other thread, even if a
> > timeout
> > > > > > response
> > > > > > > > is
> > > > > > > > > > sent
> > > > > > > > > > > > > back
> > > > > > > > > > > > > >> to the producer, the message may still be
> > committed.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Did you shut down the leader broker of the
> > partition
> > > > or
> > > > > a
> > > > > > > > > follower
> > > > > > > > > > > > > broker?
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Guozhang
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > > > > > > > kane.ist...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > I have cluster of 3 kafka brokers. With the
> > > > following
> > > > > > > > script I
> > > > > > > > > > > send
> > > > > > > > > > > > > some
> > > > > > > > > > > > > >> > data to kafka and in the middle do the
> > controlled
> > > > > > shutdown
> > > > > > > > of
> > > > > > > > > 1
> > > > > > > > > > > > > broker.
> > > > > > > > > > > > > >> All
> > > > > > > > > > > > > >> > 3 brokers are ISR before I start sending.
> When i
> > > > > > shutdown
> > > > > > > > the
> > > > > > > > > > > > broker i
> > > > > > > > > > > > > >> get
> > > > > > > > > > > > > >> > a couple of exceptions and I expect data
> > shouldn't
> > > > be
> > > > > > > > written.
> > > > > > > > > > > Say,
> > > > > > > > > > > > I
> > > > > > > > > > > > > >> send
> > > > > > > > > > > > > >> > 1500 lines and get 50 exceptions. I expect to
> > > > consume
> > > > > > 1450
> > > > > > > > > > lines,
> > > > > > > > > > > > but
> > > > > > > > > > > > > >> > instead i always consume more, i.e. 1480 or
> > 1490.
> > > I
> > > > > want
> > > > > > > to
> > > > > > > > > > decide
> > > > > > > > > > > > if
> > > > > > > > > > > > > I
> > > > > > > > > > > > > >> > want to retry sending myself, not using
> > > > > > > > > > message.send.max.retries.
> > > > > > > > > > > > But
> > > > > > > > > > > > > >> looks
> > > > > > > > > > > > > >> > like if I retry sending if there is an
> exception
> > > - I
> > > > > > will
> > > > > > > > end
> > > > > > > > > up
> > > > > > > > > > > > with
> > > > > > > > > > > > > >> > duplicates. Is there anything I'm doing wrong
> or
> > > > > having
> > > > > > > > wrong
> > > > > > > > > > > > > >> assumptions
> > > > > > > > > > > > > >> > about kafka?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Thanks.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > > > > > > > 10.80.42.154:9092,
> > > > > > > > > > > > > >> > 10.80.42.156:9092")
> > > > > > > > > > > > > >> > var count = 0
> > > > > > > > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > > > > > > > >> >     try {
> > > > > > > > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > > > > > > > >> >       count += 1
> > > > > > > > > > > > > >> >       println("sent %s", count)
> > > > > > > > > > > > > >> >     } catch {
> > > > > > > > > > > > > >> >       case _ => println("Exception!")
> > > > > > > > > > > > > >> >     }
> > > > > > > > > > > > > >> > }
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > > > > > > > >> >   val sync = true
> > > > > > > > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >   val props = new Properties()
> > > > > > > > > > > > > >> >   props.put("metadata.broker.list",
> brokerList)
> > > > > > > > > > > > > >> >   props.put("producer.type", if(sync) "sync"
> > else
> > > > > > "async")
> > > > > > > > > > > > > >> >   props.put("request.required.acks",
> > > > > > requestRequiredAcks)
> > > > > > > > > > > > > >> >   props.put("key.serializer.class",
> > > > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > > >> >   props.put("serializer.class",
> > > > > > > > > classOf[StringEncoder].getName)
> > > > > > > > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >   val producer = new Producer[AnyRef,
> > AnyRef](new
> > > > > > > > > > > > > ProducerConfig(props))
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >   def send(topic: String, messages:
> > List[String])
> > > =
> > > > {
> > > > > > > > > > > > > >> >     val requests = new
> > > > > ArrayBuffer[KeyedMessage[AnyRef,
> > > > > > > > > AnyRef]]
> > > > > > > > > > > > > >> >     for (message <- messages) {
> > > > > > > > > > > > > >> >       requests += new KeyedMessage(topic,
> null,
> > > > > message,
> > > > > > > > > > message)
> > > > > > > > > > > > > >> >     }
> > > > > > > > > > > > > >> >     producer.send(requests)
> > > > > > > > > > > > > >> >   }
> > > > > > > > > > > > > >> > }
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> --
> > > > > > > > > > > > > >> -- Guozhang
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > -- Guozhang
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to