I.e. from the documentation:

So effectively Kafka guarantees at-least-once delivery by default and
allows the user to implement at most once delivery by disabling retries on
the producer

I've disabled retries but it's not at-most-once which my test proves. It's
still at-least-once.



On Fri, Oct 25, 2013 at 11:26 AM, Kane Kane <kane.ist...@gmail.com> wrote:

> Hello Aniket,
>
> Thanks for the answer, this totally makes sense and implementing that
> layer on consumer side
> to check for dups sound like a good solution to this issue.
>
> Can we get a confirmation from kafka devs that this is how kafka supposed
> to work (by design)
> and how we should implement the solution to it. I'd hate to implement
> something that is already
> built into kafka (i.e. controlled by some configuration settings).
>
> Thanks again.
>
>
>
> On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> As per my understanding, if the broker says the msg is committed,  its
>> guaranteed to have been committed as per ur ack config. If it says it did
>> not get committed, then its very hard to figure out if this was just a
>> false error. Since there is concept of unique ids for messages, a replay
>> of
>> the same message will result in duplication. I think its a reasonable
>> behaviour considering kafka prefers to append data to partitions fot
>> performance reasons.
>> The best way to right now deal with duplicate msgs is to build the
>> processing engine (layer where your consumer sits) to deal with at least
>> once semantics of the broker.
>> On 25 Oct 2013 23:23, "Kane Kane" <kane.ist...@gmail.com> wrote:
>>
>> > Or, to rephrase it more generally, is there a way to know exactly if
>> > message was committed or no?
>> >
>> >
>> > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <kane.ist...@gmail.com>
>> wrote:
>> >
>> > > Hello Guozhang,
>> > >
>> > > My partitions are split almost evenly between broker, so, yes - broker
>> > > that I shutdown is the leader for some of them. Does it mean i can
>> get an
>> > > exception and data is still being written? Is there any setting on the
>> > > broker where i can control this? I.e. can i make broker replication
>> > timeout
>> > > shorter than producer timeout, so i can ensure if i get an exception
>> data
>> > > is not being committed?
>> > >
>> > > Thanks.
>> > >
>> > >
>> > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangg...@gmail.com
>> > >wrote:
>> > >
>> > >> Hello Kane,
>> > >>
>> > >> As discussed in the other thread, even if a timeout response is sent
>> > back
>> > >> to the producer, the message may still be committed.
>> > >>
>> > >> Did you shut down the leader broker of the partition or a follower
>> > broker?
>> > >>
>> > >> Guozhang
>> > >>
>> > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.ist...@gmail.com>
>> > wrote:
>> > >>
>> > >> > I have cluster of 3 kafka brokers. With the following script I send
>> > some
>> > >> > data to kafka and in the middle do the controlled shutdown of 1
>> > broker.
>> > >> All
>> > >> > 3 brokers are ISR before I start sending. When i shutdown the
>> broker i
>> > >> get
>> > >> > a couple of exceptions and I expect data shouldn't be written.
>> Say, I
>> > >> send
>> > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines,
>> but
>> > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide
>> if
>> > I
>> > >> > want to retry sending myself, not using message.send.max.retries.
>> But
>> > >> looks
>> > >> > like if I retry sending if there is an exception - I will end up
>> with
>> > >> > duplicates. Is there anything I'm doing wrong or having wrong
>> > >> assumptions
>> > >> > about kafka?
>> > >> >
>> > >> > Thanks.
>> > >> >
>> > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
>> > >> > 10.80.42.156:9092")
>> > >> > var count = 0
>> > >> > for(line <- Source.fromFile(file).getLines()){
>> > >> >     try {
>> > >> >       prod.send("benchmark", buffer.toList)
>> > >> >       count += 1
>> > >> >       println("sent %s", count)
>> > >> >     } catch {
>> > >> >       case _ => println("Exception!")
>> > >> >     }
>> > >> > }
>> > >> >
>> > >> > class MyProducer(brokerList: String) {
>> > >> >   val sync = true
>> > >> >   val requestRequiredAcks = "-1"
>> > >> >
>> > >> >   val props = new Properties()
>> > >> >   props.put("metadata.broker.list", brokerList)
>> > >> >   props.put("producer.type", if(sync) "sync" else "async")
>> > >> >   props.put("request.required.acks", requestRequiredAcks)
>> > >> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
>> > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
>> > >> >   props.put("message.send.max.retries", "0")
>> > >> >   props.put("request.timeout.ms", "2000")
>> > >> >
>> > >> >   val producer = new Producer[AnyRef, AnyRef](new
>> > ProducerConfig(props))
>> > >> >
>> > >> >   def send(topic: String, messages: List[String]) = {
>> > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
>> > >> >     for (message <- messages) {
>> > >> >       requests += new KeyedMessage(topic, null, message, message)
>> > >> >     }
>> > >> >     producer.send(requests)
>> > >> >   }
>> > >> > }
>> > >> >
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to