Hi,

I am writing to a Kafka Topic from within a Scala/Akka application using the 
new Producer (0.8.2.1).

While writing messages to the queue (at a very reasonable rate of a couple of 
messages per second max) the available buffer is constantly decreasing until 
the producer finally throws an exception saing the buffer is exhausted.

There must be something I do not understand that I am doing wrong, can anyone 
provde a clue?

Here is my config:

 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
 ProducerConfig.RETRIES_CONFIG -> "0",
 ProducerConfig.ACKS_CONFIG -> "1",
 ProducerConfig.COMPRESSION_TYPE_CONFIG -> "none",
 ProducerConfig.TIMEOUT_CONFIG -> new Integer(30000),
// ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(16384),
 ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(10),
 ProducerConfig.BUFFER_MEMORY_CONFIG -> new Integer(66554432),
 ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG -> new java.lang.Boolean(false),
 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer",
 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer"


And my send code:

kafkaProducer.send(new ProducerRecord[String,String](topic, key, data),new 
Callback {
  def onCompletion(recordMetadata: RecordMetadata, e: Exception):Unit = {
    if(e != null) {
      logger.error(s"Could not send $data",e)
    }
    logger.info("The offset of the record we just sent is: " + 
recordMetadata.offset())
    ()
  }

})


I am using the metrics() member to periodically look at 
"buffer-available-bytes" and I see it is constantly decreasing over time as 
messages are being sent.


Jan



Reply via email to