I.e. from the documentation: So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer
I've disabled retries but it's not at-most-once which my test proves. It's still at-least-once. On Fri, Oct 25, 2013 at 11:26 AM, Kane Kane <kane.ist...@gmail.com> wrote: > Hello Aniket, > > Thanks for the answer, this totally makes sense and implementing that > layer on consumer side > to check for dups sound like a good solution to this issue. > > Can we get a confirmation from kafka devs that this is how kafka supposed > to work (by design) > and how we should implement the solution to it. I'd hate to implement > something that is already > built into kafka (i.e. controlled by some configuration settings). > > Thanks again. > > > > On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar < > aniket.bhatna...@gmail.com> wrote: > >> As per my understanding, if the broker says the msg is committed, its >> guaranteed to have been committed as per ur ack config. If it says it did >> not get committed, then its very hard to figure out if this was just a >> false error. Since there is concept of unique ids for messages, a replay >> of >> the same message will result in duplication. I think its a reasonable >> behaviour considering kafka prefers to append data to partitions fot >> performance reasons. >> The best way to right now deal with duplicate msgs is to build the >> processing engine (layer where your consumer sits) to deal with at least >> once semantics of the broker. >> On 25 Oct 2013 23:23, "Kane Kane" <kane.ist...@gmail.com> wrote: >> >> > Or, to rephrase it more generally, is there a way to know exactly if >> > message was committed or no? >> > >> > >> > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <kane.ist...@gmail.com> >> wrote: >> > >> > > Hello Guozhang, >> > > >> > > My partitions are split almost evenly between broker, so, yes - broker >> > > that I shutdown is the leader for some of them. Does it mean i can >> get an >> > > exception and data is still being written? Is there any setting on the >> > > broker where i can control this? I.e. can i make broker replication >> > timeout >> > > shorter than producer timeout, so i can ensure if i get an exception >> data >> > > is not being committed? >> > > >> > > Thanks. >> > > >> > > >> > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangg...@gmail.com >> > >wrote: >> > > >> > >> Hello Kane, >> > >> >> > >> As discussed in the other thread, even if a timeout response is sent >> > back >> > >> to the producer, the message may still be committed. >> > >> >> > >> Did you shut down the leader broker of the partition or a follower >> > broker? >> > >> >> > >> Guozhang >> > >> >> > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.ist...@gmail.com> >> > wrote: >> > >> >> > >> > I have cluster of 3 kafka brokers. With the following script I send >> > some >> > >> > data to kafka and in the middle do the controlled shutdown of 1 >> > broker. >> > >> All >> > >> > 3 brokers are ISR before I start sending. When i shutdown the >> broker i >> > >> get >> > >> > a couple of exceptions and I expect data shouldn't be written. >> Say, I >> > >> send >> > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, >> but >> > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide >> if >> > I >> > >> > want to retry sending myself, not using message.send.max.retries. >> But >> > >> looks >> > >> > like if I retry sending if there is an exception - I will end up >> with >> > >> > duplicates. Is there anything I'm doing wrong or having wrong >> > >> assumptions >> > >> > about kafka? >> > >> > >> > >> > Thanks. >> > >> > >> > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092, >> > >> > 10.80.42.156:9092") >> > >> > var count = 0 >> > >> > for(line <- Source.fromFile(file).getLines()){ >> > >> > try { >> > >> > prod.send("benchmark", buffer.toList) >> > >> > count += 1 >> > >> > println("sent %s", count) >> > >> > } catch { >> > >> > case _ => println("Exception!") >> > >> > } >> > >> > } >> > >> > >> > >> > class MyProducer(brokerList: String) { >> > >> > val sync = true >> > >> > val requestRequiredAcks = "-1" >> > >> > >> > >> > val props = new Properties() >> > >> > props.put("metadata.broker.list", brokerList) >> > >> > props.put("producer.type", if(sync) "sync" else "async") >> > >> > props.put("request.required.acks", requestRequiredAcks) >> > >> > props.put("key.serializer.class", classOf[StringEncoder].getName) >> > >> > props.put("serializer.class", classOf[StringEncoder].getName) >> > >> > props.put("message.send.max.retries", "0") >> > >> > props.put("request.timeout.ms", "2000") >> > >> > >> > >> > val producer = new Producer[AnyRef, AnyRef](new >> > ProducerConfig(props)) >> > >> > >> > >> > def send(topic: String, messages: List[String]) = { >> > >> > val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]] >> > >> > for (message <- messages) { >> > >> > requests += new KeyedMessage(topic, null, message, message) >> > >> > } >> > >> > producer.send(requests) >> > >> > } >> > >> > } >> > >> > >> > >> >> > >> >> > >> >> > >> -- >> > >> -- Guozhang >> > >> >> > > >> > > >> > >> > >