Let me post my test code here. I could see producer.send(data); returned with no error.
public class TestProducer extends Thread { private final Producer<String, String> producer; private final int m_events; private final int m_threadNumber; private static String msg = StringUtils.rightPad("", 1000, '*'); public TestProducer(int threadNumber, int events) { m_threadNumber = threadNumber; m_events = events; Properties props = new Properties(); props.put("serializer.class", KafkaProperties.p_serializer_class); props.put("metadata.broker.list", KafkaProperties.p_metadata_broker_list); props.put("partitioner.class", KafkaProperties.p_partitioner_class); props.put("queue.enqueue.timeout.ms", KafkaProperties.p_queue_enqueue_timeout); props.put("request.required.acks", KafkaProperties.p_request_required_acks); props.put("producer.type", KafkaProperties.p_producer_type); props.put("batch.num.messages", KafkaProperties.p_batch_num); props.put("compression.codec", KafkaProperties.p_compression_codec); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } @Override public void run() { long start; long num = 0; System.out.println(new Date() + " - Message sent thread " + m_threadNumber + " started."); while (true) { start = System.currentTimeMillis(); String messageStr = new String(num + "_" + start); KeyedMessage<String, String> data = new KeyedMessage<String, String>(KafkaProperties.topic, messageStr, start + "_" + msg); producer.send(data); num++; if (num == m_events) { break; } } producer.close(); System.out.println(new Date() + " - Message sent thread " + m_threadNumber + " end. " + num + " messages sent."); } } public interface KafkaProperties { final static String zookeeper_connect = "127.0.0.1:2181"; final static String group_id = "group1"; final static String topic = "topic1"; final static String p_serializer_class = "kafka.serializer.StringEncoder"; final static String p_metadata_broker_list = "127.0.0.1:9092"; final static String p_partitioner_class = "kafka.producer.DefaultPartitioner"; final static String p_queue_enqueue_timeout = "-1"; final static String p_request_required_acks = "1"; final static String p_producer_type = "async"; final static String p_batch_num = "100"; final static String p_compression_codec = "1"; final static String p_message_send_retries = "3"; final static String p_retry_backoff_ms = "200"; final static String p_topic_metadata_refresh = "600000"; } On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com> wrote: > Thanks Paul. Yes, I am using 0.8 beta1. I followed your suggestion to set > request.required.acks=1 and got the same result. No error message seen in > broker logs, the size of the partition files were after sending 1,000,000 > events, the size of each event was 1KB : > > 00000000000000000000.index 10240 KB > 00000000000000000000.log 0KB > > The broker configurations: > > num.partitions=5 > log.flush.interval.messages=20000 > log.flush.interval.ms=5000 > > log.flush.scheduler.interval.ms=1000 > log.retention.hours=1 > log.segment.bytes=1073741824 > log.cleanup.interval.mins=30 > > queued.max.requests=16 > fetch.purgatory.purge.interval.requests=100 > producer.purgatory.purge.interval.requests=100 > > It works if I change the code to props.put("compression.codec", "0"); > > thanks, > xlu > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <pmack...@adobe.com> wrote: > >> I assume this is kafka 0.8, right? Are there any corresponding errors in >> the broker logs? With the configuration below, I don't think any errors >> will be reported back to the producer. >> >> You could also try setting erquest.required.acks=1 to see if errors are >> reported back to the client. >> >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote: >> >> >Hi , >> > >> >I am trying to enable gzip compression for my events. But after I >> switched >> >compression.codec to "1" I found the produced events were even not be >> >persisted to disk log file. Of course, the consumer could not receive any >> >compressed events. I sent 10,000 or more events but the broker's log file >> >not changed. Seems no events were actually send to broker? Below is my >> >producer's code: >> > >> > Properties props = new Properties(); >> > props.put("serializer.class", "kafka.serializer.StringEncoder"); >> > props.put("metadata.broker.list", "127.0.0.1:9092"); >> > props.put("partitioner.class", >> >"kafka.producer.DefaultPartitioner"); >> > props.put("queue.enqueue.timeout.ms", "-1"); >> > props.put("request.required.acks", "0"); >> > props.put("producer.type", "async"); >> > >> > props.put("batch.num.messages", "100"); >> > >> > props.put("compression.codec", "1"); >> > >> > ProducerConfig config = new ProducerConfig(props); >> > producer = new Producer<String, String>(config); >> > >> > KeyedMessage<String, String> data = new KeyedMessage<String, >> >String>("topic1", messageStr, msg); >> > producer.send(data); >> > >> > >> >If I comment out this line of code : props.put("compression.codec", "1"); >> >then everything works fine. Did I miss something? >> > >> >thanks, >> >xlu >> >> >