Hello Aniket,

Thanks for the answer, this totally makes sense and implementing that layer
on consumer side
to check for dups sound like a good solution to this issue.

Can we get a confirmation from kafka devs that this is how kafka supposed
to work (by design)
and how we should implement the solution to it. I'd hate to implement
something that is already
built into kafka (i.e. controlled by some configuration settings).

Thanks again.



On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> As per my understanding, if the broker says the msg is committed,  its
> guaranteed to have been committed as per ur ack config. If it says it did
> not get committed, then its very hard to figure out if this was just a
> false error. Since there is concept of unique ids for messages, a replay of
> the same message will result in duplication. I think its a reasonable
> behaviour considering kafka prefers to append data to partitions fot
> performance reasons.
> The best way to right now deal with duplicate msgs is to build the
> processing engine (layer where your consumer sits) to deal with at least
> once semantics of the broker.
> On 25 Oct 2013 23:23, "Kane Kane" <kane.ist...@gmail.com> wrote:
>
> > Or, to rephrase it more generally, is there a way to know exactly if
> > message was committed or no?
> >
> >
> > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <kane.ist...@gmail.com>
> wrote:
> >
> > > Hello Guozhang,
> > >
> > > My partitions are split almost evenly between broker, so, yes - broker
> > > that I shutdown is the leader for some of them. Does it mean i can get
> an
> > > exception and data is still being written? Is there any setting on the
> > > broker where i can control this? I.e. can i make broker replication
> > timeout
> > > shorter than producer timeout, so i can ensure if i get an exception
> data
> > > is not being committed?
> > >
> > > Thanks.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangg...@gmail.com
> > >wrote:
> > >
> > >> Hello Kane,
> > >>
> > >> As discussed in the other thread, even if a timeout response is sent
> > back
> > >> to the producer, the message may still be committed.
> > >>
> > >> Did you shut down the leader broker of the partition or a follower
> > broker?
> > >>
> > >> Guozhang
> > >>
> > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.ist...@gmail.com>
> > wrote:
> > >>
> > >> > I have cluster of 3 kafka brokers. With the following script I send
> > some
> > >> > data to kafka and in the middle do the controlled shutdown of 1
> > broker.
> > >> All
> > >> > 3 brokers are ISR before I start sending. When i shutdown the
> broker i
> > >> get
> > >> > a couple of exceptions and I expect data shouldn't be written. Say,
> I
> > >> send
> > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines,
> but
> > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide
> if
> > I
> > >> > want to retry sending myself, not using message.send.max.retries.
> But
> > >> looks
> > >> > like if I retry sending if there is an exception - I will end up
> with
> > >> > duplicates. Is there anything I'm doing wrong or having wrong
> > >> assumptions
> > >> > about kafka?
> > >> >
> > >> > Thanks.
> > >> >
> > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > >> > 10.80.42.156:9092")
> > >> > var count = 0
> > >> > for(line <- Source.fromFile(file).getLines()){
> > >> >     try {
> > >> >       prod.send("benchmark", buffer.toList)
> > >> >       count += 1
> > >> >       println("sent %s", count)
> > >> >     } catch {
> > >> >       case _ => println("Exception!")
> > >> >     }
> > >> > }
> > >> >
> > >> > class MyProducer(brokerList: String) {
> > >> >   val sync = true
> > >> >   val requestRequiredAcks = "-1"
> > >> >
> > >> >   val props = new Properties()
> > >> >   props.put("metadata.broker.list", brokerList)
> > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > >> >   props.put("request.required.acks", requestRequiredAcks)
> > >> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("message.send.max.retries", "0")
> > >> >   props.put("request.timeout.ms", "2000")
> > >> >
> > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > ProducerConfig(props))
> > >> >
> > >> >   def send(topic: String, messages: List[String]) = {
> > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> > >> >     for (message <- messages) {
> > >> >       requests += new KeyedMessage(topic, null, message, message)
> > >> >     }
> > >> >     producer.send(requests)
> > >> >   }
> > >> > }
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>

Reply via email to