Did you see any error in the producer log? Did the broker receive the produce request (you can look at the request log in the broker)?
Thanks, Jun On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <lux...@gmail.com> wrote: > Let me post my test code here. I could see producer.send(data); returned > with no error. > > public class TestProducer extends Thread { > private final Producer<String, String> producer; > > private final int m_events; > private final int m_threadNumber; > > private static String msg = StringUtils.rightPad("", 1000, '*'); > > public TestProducer(int threadNumber, int events) { > m_threadNumber = threadNumber; > m_events = events; > > Properties props = new Properties(); > props.put("serializer.class", KafkaProperties.p_serializer_class); > props.put("metadata.broker.list", > KafkaProperties.p_metadata_broker_list); > props.put("partitioner.class", > KafkaProperties.p_partitioner_class); > props.put("queue.enqueue.timeout.ms", > KafkaProperties.p_queue_enqueue_timeout); > props.put("request.required.acks", > KafkaProperties.p_request_required_acks); > props.put("producer.type", KafkaProperties.p_producer_type); > > props.put("batch.num.messages", KafkaProperties.p_batch_num); > > props.put("compression.codec", > KafkaProperties.p_compression_codec); > > ProducerConfig config = new ProducerConfig(props); > producer = new Producer<String, String>(config); > } > > @Override > public void run() { > long start; > long num = 0; > System.out.println(new Date() + " - Message sent thread " + > m_threadNumber + " started."); > while (true) { > start = System.currentTimeMillis(); > String messageStr = new String(num + "_" + start); > KeyedMessage<String, String> data = new KeyedMessage<String, > String>(KafkaProperties.topic, messageStr, > start + "_" + msg); > producer.send(data); > num++; > if (num == m_events) { > break; > } > } > producer.close(); > System.out.println(new Date() + " - Message sent thread " + > m_threadNumber + " end. " + num > + " messages sent."); > } > } > > > public interface KafkaProperties { > final static String zookeeper_connect = "127.0.0.1:2181"; > final static String group_id = "group1"; > final static String topic = "topic1"; > > final static String p_serializer_class = > "kafka.serializer.StringEncoder"; > final static String p_metadata_broker_list = "127.0.0.1:9092"; > final static String p_partitioner_class = > "kafka.producer.DefaultPartitioner"; > > final static String p_queue_enqueue_timeout = "-1"; > final static String p_request_required_acks = "1"; > final static String p_producer_type = "async"; > final static String p_batch_num = "100"; > final static String p_compression_codec = "1"; > final static String p_message_send_retries = "3"; > final static String p_retry_backoff_ms = "200"; > final static String p_topic_metadata_refresh = "600000"; > } > > > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com> wrote: > > > Thanks Paul. Yes, I am using 0.8 beta1. I followed your suggestion to > set > > request.required.acks=1 and got the same result. No error message seen in > > broker logs, the size of the partition files were after sending 1,000,000 > > events, the size of each event was 1KB : > > > > 00000000000000000000.index 10240 KB > > 00000000000000000000.log 0KB > > > > The broker configurations: > > > > num.partitions=5 > > log.flush.interval.messages=20000 > > log.flush.interval.ms=5000 > > > > log.flush.scheduler.interval.ms=1000 > > log.retention.hours=1 > > log.segment.bytes=1073741824 > > log.cleanup.interval.mins=30 > > > > queued.max.requests=16 > > fetch.purgatory.purge.interval.requests=100 > > producer.purgatory.purge.interval.requests=100 > > > > It works if I change the code to props.put("compression.codec", "0"); > > > > thanks, > > xlu > > > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <pmack...@adobe.com> > wrote: > > > >> I assume this is kafka 0.8, right? Are there any corresponding errors in > >> the broker logs? With the configuration below, I don't think any errors > >> will be reported back to the producer. > >> > >> You could also try setting erquest.required.acks=1 to see if errors are > >> reported back to the client. > >> > >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote: > >> > >> >Hi , > >> > > >> >I am trying to enable gzip compression for my events. But after I > >> switched > >> >compression.codec to "1" I found the produced events were even not be > >> >persisted to disk log file. Of course, the consumer could not receive > any > >> >compressed events. I sent 10,000 or more events but the broker's log > file > >> >not changed. Seems no events were actually send to broker? Below is my > >> >producer's code: > >> > > >> > Properties props = new Properties(); > >> > props.put("serializer.class", > "kafka.serializer.StringEncoder"); > >> > props.put("metadata.broker.list", "127.0.0.1:9092"); > >> > props.put("partitioner.class", > >> >"kafka.producer.DefaultPartitioner"); > >> > props.put("queue.enqueue.timeout.ms", "-1"); > >> > props.put("request.required.acks", "0"); > >> > props.put("producer.type", "async"); > >> > > >> > props.put("batch.num.messages", "100"); > >> > > >> > props.put("compression.codec", "1"); > >> > > >> > ProducerConfig config = new ProducerConfig(props); > >> > producer = new Producer<String, String>(config); > >> > > >> > KeyedMessage<String, String> data = new KeyedMessage<String, > >> >String>("topic1", messageStr, msg); > >> > producer.send(data); > >> > > >> > > >> >If I comment out this line of code : props.put("compression.codec", > "1"); > >> >then everything works fine. Did I miss something? > >> > > >> >thanks, > >> >xlu > >> > >> > > >