As we have said, the timeout exception does not actually mean the message
is not committed to the broker. When message.send.max.retries is 0 Kafka
does guarantee "at-most-once" which means that you will not have
duplicates, but not means that all your exceptions can be treated as
"message not delivered". In your case, 1480 - 1450 = 30 messages are the
ones that are actually sent, not the ones that are duplicates.

Guozhang


On Fri, Oct 25, 2013 at 5:00 PM, Kane Kane <kane.ist...@gmail.com> wrote:

> There are a lot of exceptions, I will try to pick an example of each:
> ERROR async.DefaultEventHandler - Failed to send requests for topics
> benchmark with correlation ids in [879,881]
> WARN  async.DefaultEventHandler - Produce request with correlation id 874
> failed due to [benchmark,43]: kafka.common.RequestTimedOutException
> WARN  client.ClientUtils$ - Fetching topic metadata with correlation id 876
> for topics [Set(benchmark)] from broker [id:2,host:10.80.42.156,port:9092]
> failed
> ERROR producer.SyncProducer - Producer connection to
> 10.80.42.156:9092unsuccessful
> kafka.common.FailedToSendMessageException: Failed to send messages after 0
> tries.
> WARN  async.DefaultEventHandler - Failed to send producer request with
> correlation id 270 to broker 0 with data for partitions [benchmark,42]
>
> I think these are all types of exceptions i see there.
> Thanks.
>
>
> On Fri, Oct 25, 2013 at 2:45 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Kane,
> >
> > If you set message.send.max.retries to 0 it should be at-most-once, and I
> > saw your props have the right config. What are the exceptions you got
> from
> > the send() call?
> >
> > Guozhang
> >
> >
> > On Fri, Oct 25, 2013 at 12:54 PM, Steve Morin <st...@stevemorin.com>
> > wrote:
> >
> > > Kane and Aniket,
> > >   I am interested in knowing what the pattern/solution that people
> > usually
> > > use to implement exactly once as well.
> > > -Steve
> > >
> > >
> > > On Fri, Oct 25, 2013 at 11:39 AM, Kane Kane <kane.ist...@gmail.com>
> > wrote:
> > >
> > > > Guozhang, but i've posted a piece from kafka documentation above:
> > > > So effectively Kafka guarantees at-least-once delivery by default and
> > > > allows the user to implement at most once delivery by disabling
> retries
> > > on
> > > > the producer.
> > > >
> > > > What i want is at-most-once and docs claim it's possible with certain
> > > > settings. Did i miss anything here?
> > > >
> > > >
> > > > On Fri, Oct 25, 2013 at 11:35 AM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Aniket is exactly right. In general, Kafka provides "at least once"
> > > > > guarantee instead of "exactly once".
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> > > > > aniket.bhatna...@gmail.com> wrote:
> > > > >
> > > > > > As per my understanding, if the broker says the msg is committed,
> > >  its
> > > > > > guaranteed to have been committed as per ur ack config. If it
> says
> > it
> > > > did
> > > > > > not get committed, then its very hard to figure out if this was
> > just
> > > a
> > > > > > false error. Since there is concept of unique ids for messages, a
> > > > replay
> > > > > of
> > > > > > the same message will result in duplication. I think its a
> > reasonable
> > > > > > behaviour considering kafka prefers to append data to partitions
> > fot
> > > > > > performance reasons.
> > > > > > The best way to right now deal with duplicate msgs is to build
> the
> > > > > > processing engine (layer where your consumer sits) to deal with
> at
> > > > least
> > > > > > once semantics of the broker.
> > > > > > On 25 Oct 2013 23:23, "Kane Kane" <kane.ist...@gmail.com> wrote:
> > > > > >
> > > > > > > Or, to rephrase it more generally, is there a way to know
> exactly
> > > if
> > > > > > > message was committed or no?
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <
> > kane.ist...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Guozhang,
> > > > > > > >
> > > > > > > > My partitions are split almost evenly between broker, so,
> yes -
> > > > > broker
> > > > > > > > that I shutdown is the leader for some of them. Does it mean
> i
> > > can
> > > > > get
> > > > > > an
> > > > > > > > exception and data is still being written? Is there any
> setting
> > > on
> > > > > the
> > > > > > > > broker where i can control this? I.e. can i make broker
> > > replication
> > > > > > > timeout
> > > > > > > > shorter than producer timeout, so i can ensure if i get an
> > > > exception
> > > > > > data
> > > > > > > > is not being committed?
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <
> > > > wangg...@gmail.com
> > > > > > > >wrote:
> > > > > > > >
> > > > > > > >> Hello Kane,
> > > > > > > >>
> > > > > > > >> As discussed in the other thread, even if a timeout response
> > is
> > > > sent
> > > > > > > back
> > > > > > > >> to the producer, the message may still be committed.
> > > > > > > >>
> > > > > > > >> Did you shut down the leader broker of the partition or a
> > > follower
> > > > > > > broker?
> > > > > > > >>
> > > > > > > >> Guozhang
> > > > > > > >>
> > > > > > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <
> > > kane.ist...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > I have cluster of 3 kafka brokers. With the following
> > script I
> > > > > send
> > > > > > > some
> > > > > > > >> > data to kafka and in the middle do the controlled shutdown
> > of
> > > 1
> > > > > > > broker.
> > > > > > > >> All
> > > > > > > >> > 3 brokers are ISR before I start sending. When i shutdown
> > the
> > > > > > broker i
> > > > > > > >> get
> > > > > > > >> > a couple of exceptions and I expect data shouldn't be
> > written.
> > > > > Say,
> > > > > > I
> > > > > > > >> send
> > > > > > > >> > 1500 lines and get 50 exceptions. I expect to consume 1450
> > > > lines,
> > > > > > but
> > > > > > > >> > instead i always consume more, i.e. 1480 or 1490. I want
> to
> > > > decide
> > > > > > if
> > > > > > > I
> > > > > > > >> > want to retry sending myself, not using
> > > > message.send.max.retries.
> > > > > > But
> > > > > > > >> looks
> > > > > > > >> > like if I retry sending if there is an exception - I will
> > end
> > > up
> > > > > > with
> > > > > > > >> > duplicates. Is there anything I'm doing wrong or having
> > wrong
> > > > > > > >> assumptions
> > > > > > > >> > about kafka?
> > > > > > > >> >
> > > > > > > >> > Thanks.
> > > > > > > >> >
> > > > > > > >> > val prod = new MyProducer("10.80.42.147:9092,
> > > 10.80.42.154:9092,
> > > > > > > >> > 10.80.42.156:9092")
> > > > > > > >> > var count = 0
> > > > > > > >> > for(line <- Source.fromFile(file).getLines()){
> > > > > > > >> >     try {
> > > > > > > >> >       prod.send("benchmark", buffer.toList)
> > > > > > > >> >       count += 1
> > > > > > > >> >       println("sent %s", count)
> > > > > > > >> >     } catch {
> > > > > > > >> >       case _ => println("Exception!")
> > > > > > > >> >     }
> > > > > > > >> > }
> > > > > > > >> >
> > > > > > > >> > class MyProducer(brokerList: String) {
> > > > > > > >> >   val sync = true
> > > > > > > >> >   val requestRequiredAcks = "-1"
> > > > > > > >> >
> > > > > > > >> >   val props = new Properties()
> > > > > > > >> >   props.put("metadata.broker.list", brokerList)
> > > > > > > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > > > > > > >> >   props.put("request.required.acks", requestRequiredAcks)
> > > > > > > >> >   props.put("key.serializer.class",
> > > > > classOf[StringEncoder].getName)
> > > > > > > >> >   props.put("serializer.class",
> > > classOf[StringEncoder].getName)
> > > > > > > >> >   props.put("message.send.max.retries", "0")
> > > > > > > >> >   props.put("request.timeout.ms", "2000")
> > > > > > > >> >
> > > > > > > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > > > > > > ProducerConfig(props))
> > > > > > > >> >
> > > > > > > >> >   def send(topic: String, messages: List[String]) = {
> > > > > > > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef,
> > > AnyRef]]
> > > > > > > >> >     for (message <- messages) {
> > > > > > > >> >       requests += new KeyedMessage(topic, null, message,
> > > > message)
> > > > > > > >> >     }
> > > > > > > >> >     producer.send(requests)
> > > > > > > >> >   }
> > > > > > > >> > }
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> --
> > > > > > > >> -- Guozhang
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to