Not sure if this is a bug. I think the issue is that the log4j property is not set properly so that no log shows up.
Thanks, Jun On Fri, Aug 30, 2013 at 9:32 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > This seems like more of a bug then a FAQ, no? We are swallowing the > exception... > > -Jay > > > On Thu, Aug 29, 2013 at 11:30 PM, Lu Xuechao <lux...@gmail.com> wrote: > > > Hi Jun, > > > > Thanks for you help. Finally, I found the reason by enabling producer > side > > DEBUG info output. The snappy jar is not included in the classpath. Added > > it and it worked. > > > > Thanks again. > > > > > > > > > > On Fri, Aug 30, 2013 at 12:53 PM, Lu Xuechao <lux...@gmail.com> wrote: > > > > > No. > > > > > > > > > On Fri, Aug 30, 2013 at 11:57 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > >> These are the metadata requests. Do you see Producer requests from > your > > >> client? > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> > > >> On Thu, Aug 29, 2013 at 5:40 PM, Lu Xuechao <lux...@gmail.com> wrote: > > >> > > >> > After I sent 1,000 compressed events, I saw these messages in > broker's > > >> log > > >> > files: > > >> > > > >> > in kafka-request.log > > >> > > > >> > [2013-08-30 08:38:18,713] TRACE Processor 6 received request : Name: > > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; > > Topics: > > >> > topic1 (kafka.network.RequestChannel$) > > >> > [2013-08-30 08:38:18,718] TRACE Completed request:Name: > > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; > > Topics: > > >> > topic1 from client > > >> > /127.0.0.1:64238 > > >> > ;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1 > > >> > (kafka.request.logger) > > >> > > > >> > > > >> > in server.log > > >> > > > >> > [2013-08-30 08:38:18,759] INFO Closing socket connection to / > > 127.0.0.1. > > >> > (kafka.network.Processor) > > >> > > > >> > > > >> > any ideas? Thanks. > > >> > > > >> > > > >> > On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <jun...@gmail.com> wrote: > > >> > > > >> > > Did you see any error in the producer log? Did the broker receive > > the > > >> > > produce request (you can look at the request log in the broker)? > > >> > > > > >> > > Thanks, > > >> > > > > >> > > Jun > > >> > > > > >> > > > > >> > > On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <lux...@gmail.com> > > wrote: > > >> > > > > >> > > > Let me post my test code here. I could see producer.send(data); > > >> > returned > > >> > > > with no error. > > >> > > > > > >> > > > public class TestProducer extends Thread { > > >> > > > private final Producer<String, String> producer; > > >> > > > > > >> > > > private final int m_events; > > >> > > > private final int m_threadNumber; > > >> > > > > > >> > > > private static String msg = StringUtils.rightPad("", 1000, > > '*'); > > >> > > > > > >> > > > public TestProducer(int threadNumber, int events) { > > >> > > > m_threadNumber = threadNumber; > > >> > > > m_events = events; > > >> > > > > > >> > > > Properties props = new Properties(); > > >> > > > props.put("serializer.class", > > >> > > KafkaProperties.p_serializer_class); > > >> > > > props.put("metadata.broker.list", > > >> > > > KafkaProperties.p_metadata_broker_list); > > >> > > > props.put("partitioner.class", > > >> > > > KafkaProperties.p_partitioner_class); > > >> > > > props.put("queue.enqueue.timeout.ms", > > >> > > > KafkaProperties.p_queue_enqueue_timeout); > > >> > > > props.put("request.required.acks", > > >> > > > KafkaProperties.p_request_required_acks); > > >> > > > props.put("producer.type", > > KafkaProperties.p_producer_type); > > >> > > > > > >> > > > props.put("batch.num.messages", > > >> KafkaProperties.p_batch_num); > > >> > > > > > >> > > > props.put("compression.codec", > > >> > > > KafkaProperties.p_compression_codec); > > >> > > > > > >> > > > ProducerConfig config = new ProducerConfig(props); > > >> > > > producer = new Producer<String, String>(config); > > >> > > > } > > >> > > > > > >> > > > @Override > > >> > > > public void run() { > > >> > > > long start; > > >> > > > long num = 0; > > >> > > > System.out.println(new Date() + " - Message sent thread > " > > + > > >> > > > m_threadNumber + " started."); > > >> > > > while (true) { > > >> > > > start = System.currentTimeMillis(); > > >> > > > String messageStr = new String(num + "_" + start); > > >> > > > KeyedMessage<String, String> data = new > > >> > KeyedMessage<String, > > >> > > > String>(KafkaProperties.topic, messageStr, > > >> > > > start + "_" + msg); > > >> > > > producer.send(data); > > >> > > > num++; > > >> > > > if (num == m_events) { > > >> > > > break; > > >> > > > } > > >> > > > } > > >> > > > producer.close(); > > >> > > > System.out.println(new Date() + " - Message sent thread > " > > + > > >> > > > m_threadNumber + " end. " + num > > >> > > > + " messages sent."); > > >> > > > } > > >> > > > } > > >> > > > > > >> > > > > > >> > > > public interface KafkaProperties { > > >> > > > final static String zookeeper_connect = "127.0.0.1:2181"; > > >> > > > final static String group_id = "group1"; > > >> > > > final static String topic = "topic1"; > > >> > > > > > >> > > > final static String p_serializer_class = > > >> > > > "kafka.serializer.StringEncoder"; > > >> > > > final static String p_metadata_broker_list = " > 127.0.0.1:9092 > > "; > > >> > > > final static String p_partitioner_class = > > >> > > > "kafka.producer.DefaultPartitioner"; > > >> > > > > > >> > > > final static String p_queue_enqueue_timeout = "-1"; > > >> > > > final static String p_request_required_acks = "1"; > > >> > > > final static String p_producer_type = "async"; > > >> > > > final static String p_batch_num = "100"; > > >> > > > final static String p_compression_codec = "1"; > > >> > > > final static String p_message_send_retries = "3"; > > >> > > > final static String p_retry_backoff_ms = "200"; > > >> > > > final static String p_topic_metadata_refresh = "600000"; > > >> > > > } > > >> > > > > > >> > > > > > >> > > > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com> > > >> wrote: > > >> > > > > > >> > > > > Thanks Paul. Yes, I am using 0.8 beta1. I followed your > > >> suggestion > > >> > to > > >> > > > set > > >> > > > > request.required.acks=1 and got the same result. No error > > message > > >> > seen > > >> > > in > > >> > > > > broker logs, the size of the partition files were after > sending > > >> > > 1,000,000 > > >> > > > > events, the size of each event was 1KB : > > >> > > > > > > >> > > > > 00000000000000000000.index 10240 KB > > >> > > > > 00000000000000000000.log 0KB > > >> > > > > > > >> > > > > The broker configurations: > > >> > > > > > > >> > > > > num.partitions=5 > > >> > > > > log.flush.interval.messages=20000 > > >> > > > > log.flush.interval.ms=5000 > > >> > > > > > > >> > > > > log.flush.scheduler.interval.ms=1000 > > >> > > > > log.retention.hours=1 > > >> > > > > log.segment.bytes=1073741824 > > >> > > > > log.cleanup.interval.mins=30 > > >> > > > > > > >> > > > > queued.max.requests=16 > > >> > > > > fetch.purgatory.purge.interval.requests=100 > > >> > > > > producer.purgatory.purge.interval.requests=100 > > >> > > > > > > >> > > > > It works if I change the code to > props.put("compression.codec", > > >> "0"); > > >> > > > > > > >> > > > > thanks, > > >> > > > > xlu > > >> > > > > > > >> > > > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles < > > pmack...@adobe.com > > >> > > > >> > > > wrote: > > >> > > > > > > >> > > > >> I assume this is kafka 0.8, right? Are there any > corresponding > > >> > errors > > >> > > in > > >> > > > >> the broker logs? With the configuration below, I don't think > > any > > >> > > errors > > >> > > > >> will be reported back to the producer. > > >> > > > >> > > >> > > > >> You could also try setting erquest.required.acks=1 to see if > > >> errors > > >> > > are > > >> > > > >> reported back to the client. > > >> > > > >> > > >> > > > >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote: > > >> > > > >> > > >> > > > >> >Hi , > > >> > > > >> > > > >> > > > >> >I am trying to enable gzip compression for my events. But > > after > > >> I > > >> > > > >> switched > > >> > > > >> >compression.codec to "1" I found the produced events were > even > > >> not > > >> > be > > >> > > > >> >persisted to disk log file. Of course, the consumer could > not > > >> > receive > > >> > > > any > > >> > > > >> >compressed events. I sent 10,000 or more events but the > > broker's > > >> > log > > >> > > > file > > >> > > > >> >not changed. Seems no events were actually send to broker? > > >> Below is > > >> > > my > > >> > > > >> >producer's code: > > >> > > > >> > > > >> > > > >> > Properties props = new Properties(); > > >> > > > >> > props.put("serializer.class", > > >> > > > "kafka.serializer.StringEncoder"); > > >> > > > >> > props.put("metadata.broker.list", "127.0.0.1:9092 > "); > > >> > > > >> > props.put("partitioner.class", > > >> > > > >> >"kafka.producer.DefaultPartitioner"); > > >> > > > >> > props.put("queue.enqueue.timeout.ms", "-1"); > > >> > > > >> > props.put("request.required.acks", "0"); > > >> > > > >> > props.put("producer.type", "async"); > > >> > > > >> > > > >> > > > >> > props.put("batch.num.messages", "100"); > > >> > > > >> > > > >> > > > >> > props.put("compression.codec", "1"); > > >> > > > >> > > > >> > > > >> > ProducerConfig config = new ProducerConfig(props); > > >> > > > >> > producer = new Producer<String, String>(config); > > >> > > > >> > > > >> > > > >> > KeyedMessage<String, String> data = new > > >> > KeyedMessage<String, > > >> > > > >> >String>("topic1", messageStr, msg); > > >> > > > >> > producer.send(data); > > >> > > > >> > > > >> > > > >> > > > >> > > > >> >If I comment out this line of code : > > >> props.put("compression.codec", > > >> > > > "1"); > > >> > > > >> >then everything works fine. Did I miss something? > > >> > > > >> > > > >> > > > >> >thanks, > > >> > > > >> >xlu > > >> > > > >> > > >> > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >