These are the metadata requests. Do you see Producer requests from your client?
Thanks, Jun On Thu, Aug 29, 2013 at 5:40 PM, Lu Xuechao <lux...@gmail.com> wrote: > After I sent 1,000 compressed events, I saw these messages in broker's log > files: > > in kafka-request.log > > [2013-08-30 08:38:18,713] TRACE Processor 6 received request : Name: > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; Topics: > topic1 (kafka.network.RequestChannel$) > [2013-08-30 08:38:18,718] TRACE Completed request:Name: > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; Topics: > topic1 from client > /127.0.0.1:64238 > ;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1 > (kafka.request.logger) > > > in server.log > > [2013-08-30 08:38:18,759] INFO Closing socket connection to /127.0.0.1. > (kafka.network.Processor) > > > any ideas? Thanks. > > > On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <jun...@gmail.com> wrote: > > > Did you see any error in the producer log? Did the broker receive the > > produce request (you can look at the request log in the broker)? > > > > Thanks, > > > > Jun > > > > > > On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <lux...@gmail.com> wrote: > > > > > Let me post my test code here. I could see producer.send(data); > returned > > > with no error. > > > > > > public class TestProducer extends Thread { > > > private final Producer<String, String> producer; > > > > > > private final int m_events; > > > private final int m_threadNumber; > > > > > > private static String msg = StringUtils.rightPad("", 1000, '*'); > > > > > > public TestProducer(int threadNumber, int events) { > > > m_threadNumber = threadNumber; > > > m_events = events; > > > > > > Properties props = new Properties(); > > > props.put("serializer.class", > > KafkaProperties.p_serializer_class); > > > props.put("metadata.broker.list", > > > KafkaProperties.p_metadata_broker_list); > > > props.put("partitioner.class", > > > KafkaProperties.p_partitioner_class); > > > props.put("queue.enqueue.timeout.ms", > > > KafkaProperties.p_queue_enqueue_timeout); > > > props.put("request.required.acks", > > > KafkaProperties.p_request_required_acks); > > > props.put("producer.type", KafkaProperties.p_producer_type); > > > > > > props.put("batch.num.messages", KafkaProperties.p_batch_num); > > > > > > props.put("compression.codec", > > > KafkaProperties.p_compression_codec); > > > > > > ProducerConfig config = new ProducerConfig(props); > > > producer = new Producer<String, String>(config); > > > } > > > > > > @Override > > > public void run() { > > > long start; > > > long num = 0; > > > System.out.println(new Date() + " - Message sent thread " + > > > m_threadNumber + " started."); > > > while (true) { > > > start = System.currentTimeMillis(); > > > String messageStr = new String(num + "_" + start); > > > KeyedMessage<String, String> data = new > KeyedMessage<String, > > > String>(KafkaProperties.topic, messageStr, > > > start + "_" + msg); > > > producer.send(data); > > > num++; > > > if (num == m_events) { > > > break; > > > } > > > } > > > producer.close(); > > > System.out.println(new Date() + " - Message sent thread " + > > > m_threadNumber + " end. " + num > > > + " messages sent."); > > > } > > > } > > > > > > > > > public interface KafkaProperties { > > > final static String zookeeper_connect = "127.0.0.1:2181"; > > > final static String group_id = "group1"; > > > final static String topic = "topic1"; > > > > > > final static String p_serializer_class = > > > "kafka.serializer.StringEncoder"; > > > final static String p_metadata_broker_list = "127.0.0.1:9092"; > > > final static String p_partitioner_class = > > > "kafka.producer.DefaultPartitioner"; > > > > > > final static String p_queue_enqueue_timeout = "-1"; > > > final static String p_request_required_acks = "1"; > > > final static String p_producer_type = "async"; > > > final static String p_batch_num = "100"; > > > final static String p_compression_codec = "1"; > > > final static String p_message_send_retries = "3"; > > > final static String p_retry_backoff_ms = "200"; > > > final static String p_topic_metadata_refresh = "600000"; > > > } > > > > > > > > > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com> wrote: > > > > > > > Thanks Paul. Yes, I am using 0.8 beta1. I followed your suggestion > to > > > set > > > > request.required.acks=1 and got the same result. No error message > seen > > in > > > > broker logs, the size of the partition files were after sending > > 1,000,000 > > > > events, the size of each event was 1KB : > > > > > > > > 00000000000000000000.index 10240 KB > > > > 00000000000000000000.log 0KB > > > > > > > > The broker configurations: > > > > > > > > num.partitions=5 > > > > log.flush.interval.messages=20000 > > > > log.flush.interval.ms=5000 > > > > > > > > log.flush.scheduler.interval.ms=1000 > > > > log.retention.hours=1 > > > > log.segment.bytes=1073741824 > > > > log.cleanup.interval.mins=30 > > > > > > > > queued.max.requests=16 > > > > fetch.purgatory.purge.interval.requests=100 > > > > producer.purgatory.purge.interval.requests=100 > > > > > > > > It works if I change the code to props.put("compression.codec", "0"); > > > > > > > > thanks, > > > > xlu > > > > > > > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <pmack...@adobe.com> > > > wrote: > > > > > > > >> I assume this is kafka 0.8, right? Are there any corresponding > errors > > in > > > >> the broker logs? With the configuration below, I don't think any > > errors > > > >> will be reported back to the producer. > > > >> > > > >> You could also try setting erquest.required.acks=1 to see if errors > > are > > > >> reported back to the client. > > > >> > > > >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote: > > > >> > > > >> >Hi , > > > >> > > > > >> >I am trying to enable gzip compression for my events. But after I > > > >> switched > > > >> >compression.codec to "1" I found the produced events were even not > be > > > >> >persisted to disk log file. Of course, the consumer could not > receive > > > any > > > >> >compressed events. I sent 10,000 or more events but the broker's > log > > > file > > > >> >not changed. Seems no events were actually send to broker? Below is > > my > > > >> >producer's code: > > > >> > > > > >> > Properties props = new Properties(); > > > >> > props.put("serializer.class", > > > "kafka.serializer.StringEncoder"); > > > >> > props.put("metadata.broker.list", "127.0.0.1:9092"); > > > >> > props.put("partitioner.class", > > > >> >"kafka.producer.DefaultPartitioner"); > > > >> > props.put("queue.enqueue.timeout.ms", "-1"); > > > >> > props.put("request.required.acks", "0"); > > > >> > props.put("producer.type", "async"); > > > >> > > > > >> > props.put("batch.num.messages", "100"); > > > >> > > > > >> > props.put("compression.codec", "1"); > > > >> > > > > >> > ProducerConfig config = new ProducerConfig(props); > > > >> > producer = new Producer<String, String>(config); > > > >> > > > > >> > KeyedMessage<String, String> data = new > KeyedMessage<String, > > > >> >String>("topic1", messageStr, msg); > > > >> > producer.send(data); > > > >> > > > > >> > > > > >> >If I comment out this line of code : props.put("compression.codec", > > > "1"); > > > >> >then everything works fine. Did I miss something? > > > >> > > > > >> >thanks, > > > >> >xlu > > > >> > > > >> > > > > > > > > > >