Xuechao, Thanks for updating the wiki. Not setting up log4j properly seems to be a general problem, not just limited to this particular problem. Could you reword it a bit to make it more general?
Jun On Fri, Aug 30, 2013 at 2:02 AM, Lu Xuechao <lux...@gmail.com> wrote: > Hi, Joe. wiki updated. Hope it helps. > > > On Fri, Aug 30, 2013 at 3:22 PM, Joe Stein <crypt...@gmail.com> wrote: > > > I feel like this is maybe a usual case as we have heard it before now a > few > > bits > > > > Lu Xuechao would you mind updating the FAQ > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ with what the > > problem > > was and your solution just to capture this thread in the wiki please, > > thanx! > > > > /******************************************* > > Joe Stein > > Founder, Principal Consultant > > Big Data Open Source Security LLC > > http://www.stealth.ly > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > ********************************************/ > > > > > > On Fri, Aug 30, 2013 at 2:30 AM, Lu Xuechao <lux...@gmail.com> wrote: > > > > > Hi Jun, > > > > > > Thanks for you help. Finally, I found the reason by enabling producer > > side > > > DEBUG info output. The snappy jar is not included in the classpath. > Added > > > it and it worked. > > > > > > Thanks again. > > > > > > > > > > > > > > > On Fri, Aug 30, 2013 at 12:53 PM, Lu Xuechao <lux...@gmail.com> wrote: > > > > > > > No. > > > > > > > > > > > > On Fri, Aug 30, 2013 at 11:57 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > >> These are the metadata requests. Do you see Producer requests from > > your > > > >> client? > > > >> > > > >> Thanks, > > > >> > > > >> Jun > > > >> > > > >> > > > >> On Thu, Aug 29, 2013 at 5:40 PM, Lu Xuechao <lux...@gmail.com> > wrote: > > > >> > > > >> > After I sent 1,000 compressed events, I saw these messages in > > broker's > > > >> log > > > >> > files: > > > >> > > > > >> > in kafka-request.log > > > >> > > > > >> > [2013-08-30 08:38:18,713] TRACE Processor 6 received request : > Name: > > > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; > > > Topics: > > > >> > topic1 (kafka.network.RequestChannel$) > > > >> > [2013-08-30 08:38:18,718] TRACE Completed request:Name: > > > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; > > > Topics: > > > >> > topic1 from client > > > >> > /127.0.0.1:64238 > > > >> > ;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1 > > > >> > (kafka.request.logger) > > > >> > > > > >> > > > > >> > in server.log > > > >> > > > > >> > [2013-08-30 08:38:18,759] INFO Closing socket connection to / > > > 127.0.0.1. > > > >> > (kafka.network.Processor) > > > >> > > > > >> > > > > >> > any ideas? Thanks. > > > >> > > > > >> > > > > >> > On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <jun...@gmail.com> > wrote: > > > >> > > > > >> > > Did you see any error in the producer log? Did the broker > receive > > > the > > > >> > > produce request (you can look at the request log in the broker)? > > > >> > > > > > >> > > Thanks, > > > >> > > > > > >> > > Jun > > > >> > > > > > >> > > > > > >> > > On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <lux...@gmail.com> > > > wrote: > > > >> > > > > > >> > > > Let me post my test code here. I could see > producer.send(data); > > > >> > returned > > > >> > > > with no error. > > > >> > > > > > > >> > > > public class TestProducer extends Thread { > > > >> > > > private final Producer<String, String> producer; > > > >> > > > > > > >> > > > private final int m_events; > > > >> > > > private final int m_threadNumber; > > > >> > > > > > > >> > > > private static String msg = StringUtils.rightPad("", 1000, > > > '*'); > > > >> > > > > > > >> > > > public TestProducer(int threadNumber, int events) { > > > >> > > > m_threadNumber = threadNumber; > > > >> > > > m_events = events; > > > >> > > > > > > >> > > > Properties props = new Properties(); > > > >> > > > props.put("serializer.class", > > > >> > > KafkaProperties.p_serializer_class); > > > >> > > > props.put("metadata.broker.list", > > > >> > > > KafkaProperties.p_metadata_broker_list); > > > >> > > > props.put("partitioner.class", > > > >> > > > KafkaProperties.p_partitioner_class); > > > >> > > > props.put("queue.enqueue.timeout.ms", > > > >> > > > KafkaProperties.p_queue_enqueue_timeout); > > > >> > > > props.put("request.required.acks", > > > >> > > > KafkaProperties.p_request_required_acks); > > > >> > > > props.put("producer.type", > > > KafkaProperties.p_producer_type); > > > >> > > > > > > >> > > > props.put("batch.num.messages", > > > >> KafkaProperties.p_batch_num); > > > >> > > > > > > >> > > > props.put("compression.codec", > > > >> > > > KafkaProperties.p_compression_codec); > > > >> > > > > > > >> > > > ProducerConfig config = new ProducerConfig(props); > > > >> > > > producer = new Producer<String, String>(config); > > > >> > > > } > > > >> > > > > > > >> > > > @Override > > > >> > > > public void run() { > > > >> > > > long start; > > > >> > > > long num = 0; > > > >> > > > System.out.println(new Date() + " - Message sent > thread > > " > > > + > > > >> > > > m_threadNumber + " started."); > > > >> > > > while (true) { > > > >> > > > start = System.currentTimeMillis(); > > > >> > > > String messageStr = new String(num + "_" + start); > > > >> > > > KeyedMessage<String, String> data = new > > > >> > KeyedMessage<String, > > > >> > > > String>(KafkaProperties.topic, messageStr, > > > >> > > > start + "_" + msg); > > > >> > > > producer.send(data); > > > >> > > > num++; > > > >> > > > if (num == m_events) { > > > >> > > > break; > > > >> > > > } > > > >> > > > } > > > >> > > > producer.close(); > > > >> > > > System.out.println(new Date() + " - Message sent > thread > > " > > > + > > > >> > > > m_threadNumber + " end. " + num > > > >> > > > + " messages sent."); > > > >> > > > } > > > >> > > > } > > > >> > > > > > > >> > > > > > > >> > > > public interface KafkaProperties { > > > >> > > > final static String zookeeper_connect = "127.0.0.1:2181"; > > > >> > > > final static String group_id = "group1"; > > > >> > > > final static String topic = "topic1"; > > > >> > > > > > > >> > > > final static String p_serializer_class = > > > >> > > > "kafka.serializer.StringEncoder"; > > > >> > > > final static String p_metadata_broker_list = " > > 127.0.0.1:9092 > > > "; > > > >> > > > final static String p_partitioner_class = > > > >> > > > "kafka.producer.DefaultPartitioner"; > > > >> > > > > > > >> > > > final static String p_queue_enqueue_timeout = "-1"; > > > >> > > > final static String p_request_required_acks = "1"; > > > >> > > > final static String p_producer_type = "async"; > > > >> > > > final static String p_batch_num = "100"; > > > >> > > > final static String p_compression_codec = "1"; > > > >> > > > final static String p_message_send_retries = "3"; > > > >> > > > final static String p_retry_backoff_ms = "200"; > > > >> > > > final static String p_topic_metadata_refresh = "600000"; > > > >> > > > } > > > >> > > > > > > >> > > > > > > >> > > > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com > > > > > >> wrote: > > > >> > > > > > > >> > > > > Thanks Paul. Yes, I am using 0.8 beta1. I followed your > > > >> suggestion > > > >> > to > > > >> > > > set > > > >> > > > > request.required.acks=1 and got the same result. No error > > > message > > > >> > seen > > > >> > > in > > > >> > > > > broker logs, the size of the partition files were after > > sending > > > >> > > 1,000,000 > > > >> > > > > events, the size of each event was 1KB : > > > >> > > > > > > > >> > > > > 00000000000000000000.index 10240 KB > > > >> > > > > 00000000000000000000.log 0KB > > > >> > > > > > > > >> > > > > The broker configurations: > > > >> > > > > > > > >> > > > > num.partitions=5 > > > >> > > > > log.flush.interval.messages=20000 > > > >> > > > > log.flush.interval.ms=5000 > > > >> > > > > > > > >> > > > > log.flush.scheduler.interval.ms=1000 > > > >> > > > > log.retention.hours=1 > > > >> > > > > log.segment.bytes=1073741824 > > > >> > > > > log.cleanup.interval.mins=30 > > > >> > > > > > > > >> > > > > queued.max.requests=16 > > > >> > > > > fetch.purgatory.purge.interval.requests=100 > > > >> > > > > producer.purgatory.purge.interval.requests=100 > > > >> > > > > > > > >> > > > > It works if I change the code to > > props.put("compression.codec", > > > >> "0"); > > > >> > > > > > > > >> > > > > thanks, > > > >> > > > > xlu > > > >> > > > > > > > >> > > > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles < > > > pmack...@adobe.com > > > >> > > > > >> > > > wrote: > > > >> > > > > > > > >> > > > >> I assume this is kafka 0.8, right? Are there any > > corresponding > > > >> > errors > > > >> > > in > > > >> > > > >> the broker logs? With the configuration below, I don't > think > > > any > > > >> > > errors > > > >> > > > >> will be reported back to the producer. > > > >> > > > >> > > > >> > > > >> You could also try setting erquest.required.acks=1 to see > if > > > >> errors > > > >> > > are > > > >> > > > >> reported back to the client. > > > >> > > > >> > > > >> > > > >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote: > > > >> > > > >> > > > >> > > > >> >Hi , > > > >> > > > >> > > > > >> > > > >> >I am trying to enable gzip compression for my events. But > > > after > > > >> I > > > >> > > > >> switched > > > >> > > > >> >compression.codec to "1" I found the produced events were > > even > > > >> not > > > >> > be > > > >> > > > >> >persisted to disk log file. Of course, the consumer could > > not > > > >> > receive > > > >> > > > any > > > >> > > > >> >compressed events. I sent 10,000 or more events but the > > > broker's > > > >> > log > > > >> > > > file > > > >> > > > >> >not changed. Seems no events were actually send to broker? > > > >> Below is > > > >> > > my > > > >> > > > >> >producer's code: > > > >> > > > >> > > > > >> > > > >> > Properties props = new Properties(); > > > >> > > > >> > props.put("serializer.class", > > > >> > > > "kafka.serializer.StringEncoder"); > > > >> > > > >> > props.put("metadata.broker.list", "127.0.0.1:9092 > > "); > > > >> > > > >> > props.put("partitioner.class", > > > >> > > > >> >"kafka.producer.DefaultPartitioner"); > > > >> > > > >> > props.put("queue.enqueue.timeout.ms", "-1"); > > > >> > > > >> > props.put("request.required.acks", "0"); > > > >> > > > >> > props.put("producer.type", "async"); > > > >> > > > >> > > > > >> > > > >> > props.put("batch.num.messages", "100"); > > > >> > > > >> > > > > >> > > > >> > props.put("compression.codec", "1"); > > > >> > > > >> > > > > >> > > > >> > ProducerConfig config = new ProducerConfig(props); > > > >> > > > >> > producer = new Producer<String, String>(config); > > > >> > > > >> > > > > >> > > > >> > KeyedMessage<String, String> data = new > > > >> > KeyedMessage<String, > > > >> > > > >> >String>("topic1", messageStr, msg); > > > >> > > > >> > producer.send(data); > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> >If I comment out this line of code : > > > >> props.put("compression.codec", > > > >> > > > "1"); > > > >> > > > >> >then everything works fine. Did I miss something? > > > >> > > > >> > > > > >> > > > >> >thanks, > > > >> > > > >> >xlu > > > >> > > > >> > > > >> > > > >> > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > >