Sorry the formatting was all messed up.I re-tested this code with 0.9.0.1 and 
it worked fine--KafkaProducer closed and committed the number of records 
expected into the partitions.
So this seems like a SNAPSHOT issue.  Will continue looking.

      From: Greg Zoller <gwzol...@yahoo.com.INVALID>
 To: "users@kafka.apache.org" <users@kafka.apache.org> 
 Sent: Tuesday, April 12, 2016 10:40 AM
 Subject: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
   
Hello,
I'm trying to run the latest master build in github.  I've got producer code 
like below:
   val props = Map(      "bootstrap.servers" -> host,       "key.serializer" -> 
"org.apache.kafka.common.serialization.ByteArraySerializer",      
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer"  
  )    val p = new KafkaProducer[Array[Byte], String](props)    (1 to 
num).foreach { i =>      p.send(new ProducerRecord[Array[Byte], String](topic, 
s"msg-$i"))    }    p.close(10000, java.util.concurrent.TimeUnit.MILLISECONDS)


This code will wait 10 seconds then close the KafkaProducer.  At that point if 
I check the offsets in my server (same build) I see all offsets set to 0...no 
data committed.  If I put some kind of println in my loop I see the p.send() 
call seeming to work happily.
Any ideas?
Thanks,Greg

  

Reply via email to