Hi, I am writing to a Kafka Topic from within a Scala/Akka application using the new Producer (0.8.2.1).
While writing messages to the queue (at a very reasonable rate of a couple of messages per second max) the available buffer is constantly decreasing until the producer finally throws an exception saing the buffer is exhausted. There must be something I do not understand that I am doing wrong, can anyone provde a clue? Here is my config: ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ProducerConfig.RETRIES_CONFIG -> "0", ProducerConfig.ACKS_CONFIG -> "1", ProducerConfig.COMPRESSION_TYPE_CONFIG -> "none", ProducerConfig.TIMEOUT_CONFIG -> new Integer(30000), // ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(16384), ProducerConfig.BATCH_SIZE_CONFIG -> new Integer(10), ProducerConfig.BUFFER_MEMORY_CONFIG -> new Integer(66554432), ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG -> new java.lang.Boolean(false), ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer" And my send code: kafkaProducer.send(new ProducerRecord[String,String](topic, key, data),new Callback { def onCompletion(recordMetadata: RecordMetadata, e: Exception):Unit = { if(e != null) { logger.error(s"Could not send $data",e) } logger.info("The offset of the record we just sent is: " + recordMetadata.offset()) () } }) I am using the metrics() member to periodically look at "buffer-available-bytes" and I see it is constantly decreasing over time as messages are being sent. Jan