Hello Guozhang, My partitions are split almost evenly between broker, so, yes - broker that I shutdown is the leader for some of them. Does it mean i can get an exception and data is still being written? Is there any setting on the broker where i can control this? I.e. can i make broker replication timeout shorter than producer timeout, so i can ensure if i get an exception data is not being committed?
Thanks. On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Kane, > > As discussed in the other thread, even if a timeout response is sent back > to the producer, the message may still be committed. > > Did you shut down the leader broker of the partition or a follower broker? > > Guozhang > > On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.ist...@gmail.com> wrote: > > > I have cluster of 3 kafka brokers. With the following script I send some > > data to kafka and in the middle do the controlled shutdown of 1 broker. > All > > 3 brokers are ISR before I start sending. When i shutdown the broker i > get > > a couple of exceptions and I expect data shouldn't be written. Say, I > send > > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but > > instead i always consume more, i.e. 1480 or 1490. I want to decide if I > > want to retry sending myself, not using message.send.max.retries. But > looks > > like if I retry sending if there is an exception - I will end up with > > duplicates. Is there anything I'm doing wrong or having wrong assumptions > > about kafka? > > > > Thanks. > > > > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092, > > 10.80.42.156:9092") > > var count = 0 > > for(line <- Source.fromFile(file).getLines()){ > > try { > > prod.send("benchmark", buffer.toList) > > count += 1 > > println("sent %s", count) > > } catch { > > case _ => println("Exception!") > > } > > } > > > > class MyProducer(brokerList: String) { > > val sync = true > > val requestRequiredAcks = "-1" > > > > val props = new Properties() > > props.put("metadata.broker.list", brokerList) > > props.put("producer.type", if(sync) "sync" else "async") > > props.put("request.required.acks", requestRequiredAcks) > > props.put("key.serializer.class", classOf[StringEncoder].getName) > > props.put("serializer.class", classOf[StringEncoder].getName) > > props.put("message.send.max.retries", "0") > > props.put("request.timeout.ms", "2000") > > > > val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) > > > > def send(topic: String, messages: List[String]) = { > > val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]] > > for (message <- messages) { > > requests += new KeyedMessage(topic, null, message, message) > > } > > producer.send(requests) > > } > > } > > > > > > -- > -- Guozhang >