Aniket is exactly right. In general, Kafka provides "at least once" guarantee instead of "exactly once".
Guozhang On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar < aniket.bhatna...@gmail.com> wrote: > As per my understanding, if the broker says the msg is committed, its > guaranteed to have been committed as per ur ack config. If it says it did > not get committed, then its very hard to figure out if this was just a > false error. Since there is concept of unique ids for messages, a replay of > the same message will result in duplication. I think its a reasonable > behaviour considering kafka prefers to append data to partitions fot > performance reasons. > The best way to right now deal with duplicate msgs is to build the > processing engine (layer where your consumer sits) to deal with at least > once semantics of the broker. > On 25 Oct 2013 23:23, "Kane Kane" <kane.ist...@gmail.com> wrote: > > > Or, to rephrase it more generally, is there a way to know exactly if > > message was committed or no? > > > > > > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <kane.ist...@gmail.com> > wrote: > > > > > Hello Guozhang, > > > > > > My partitions are split almost evenly between broker, so, yes - broker > > > that I shutdown is the leader for some of them. Does it mean i can get > an > > > exception and data is still being written? Is there any setting on the > > > broker where i can control this? I.e. can i make broker replication > > timeout > > > shorter than producer timeout, so i can ensure if i get an exception > data > > > is not being committed? > > > > > > Thanks. > > > > > > > > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangg...@gmail.com > > >wrote: > > > > > >> Hello Kane, > > >> > > >> As discussed in the other thread, even if a timeout response is sent > > back > > >> to the producer, the message may still be committed. > > >> > > >> Did you shut down the leader broker of the partition or a follower > > broker? > > >> > > >> Guozhang > > >> > > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.ist...@gmail.com> > > wrote: > > >> > > >> > I have cluster of 3 kafka brokers. With the following script I send > > some > > >> > data to kafka and in the middle do the controlled shutdown of 1 > > broker. > > >> All > > >> > 3 brokers are ISR before I start sending. When i shutdown the > broker i > > >> get > > >> > a couple of exceptions and I expect data shouldn't be written. Say, > I > > >> send > > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, > but > > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide > if > > I > > >> > want to retry sending myself, not using message.send.max.retries. > But > > >> looks > > >> > like if I retry sending if there is an exception - I will end up > with > > >> > duplicates. Is there anything I'm doing wrong or having wrong > > >> assumptions > > >> > about kafka? > > >> > > > >> > Thanks. > > >> > > > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092, > > >> > 10.80.42.156:9092") > > >> > var count = 0 > > >> > for(line <- Source.fromFile(file).getLines()){ > > >> > try { > > >> > prod.send("benchmark", buffer.toList) > > >> > count += 1 > > >> > println("sent %s", count) > > >> > } catch { > > >> > case _ => println("Exception!") > > >> > } > > >> > } > > >> > > > >> > class MyProducer(brokerList: String) { > > >> > val sync = true > > >> > val requestRequiredAcks = "-1" > > >> > > > >> > val props = new Properties() > > >> > props.put("metadata.broker.list", brokerList) > > >> > props.put("producer.type", if(sync) "sync" else "async") > > >> > props.put("request.required.acks", requestRequiredAcks) > > >> > props.put("key.serializer.class", classOf[StringEncoder].getName) > > >> > props.put("serializer.class", classOf[StringEncoder].getName) > > >> > props.put("message.send.max.retries", "0") > > >> > props.put("request.timeout.ms", "2000") > > >> > > > >> > val producer = new Producer[AnyRef, AnyRef](new > > ProducerConfig(props)) > > >> > > > >> > def send(topic: String, messages: List[String]) = { > > >> > val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]] > > >> > for (message <- messages) { > > >> > requests += new KeyedMessage(topic, null, message, message) > > >> > } > > >> > producer.send(requests) > > >> > } > > >> > } > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > -- -- Guozhang