Xuechao,

Thanks for updating the wiki. Not setting up log4j properly seems to be a
general problem, not just limited to this particular problem. Could you
reword it a bit to make it more general?

Jun


On Fri, Aug 30, 2013 at 2:02 AM, Lu Xuechao <lux...@gmail.com> wrote:

> Hi, Joe. wiki updated. Hope it helps.
>
>
> On Fri, Aug 30, 2013 at 3:22 PM, Joe Stein <crypt...@gmail.com> wrote:
>
> > I feel like this is maybe a usual case as we have heard it before now a
> few
> > bits
> >
> > Lu Xuechao would you mind updating the FAQ
> > https://cwiki.apache.org/confluence/display/KAFKA/FAQ with what the
> > problem
> > was and your solution just to capture this thread in the wiki please,
> > thanx!
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> >
> > On Fri, Aug 30, 2013 at 2:30 AM, Lu Xuechao <lux...@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for you help. Finally, I found the reason by enabling producer
> > side
> > > DEBUG info output. The snappy jar is not included in the classpath.
> Added
> > > it and it worked.
> > >
> > > Thanks again.
> > >
> > >
> > >
> > >
> > > On Fri, Aug 30, 2013 at 12:53 PM, Lu Xuechao <lux...@gmail.com> wrote:
> > >
> > > > No.
> > > >
> > > >
> > > > On Fri, Aug 30, 2013 at 11:57 AM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > >> These are the metadata requests. Do you see Producer requests from
> > your
> > > >> client?
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Thu, Aug 29, 2013 at 5:40 PM, Lu Xuechao <lux...@gmail.com>
> wrote:
> > > >>
> > > >> > After I sent 1,000 compressed events, I saw these messages in
> > broker's
> > > >> log
> > > >> > files:
> > > >> >
> > > >> > in kafka-request.log
> > > >> >
> > > >> > [2013-08-30 08:38:18,713] TRACE Processor 6 received request :
> Name:
> > > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ;
> > > Topics:
> > > >> > topic1 (kafka.network.RequestChannel$)
> > > >> > [2013-08-30 08:38:18,718] TRACE Completed request:Name:
> > > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ;
> > > Topics:
> > > >> > topic1 from client
> > > >> > /127.0.0.1:64238
> > > >> > ;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1
> > > >> > (kafka.request.logger)
> > > >> >
> > > >> >
> > > >> > in server.log
> > > >> >
> > > >> > [2013-08-30 08:38:18,759] INFO Closing socket connection to /
> > > 127.0.0.1.
> > > >> > (kafka.network.Processor)
> > > >> >
> > > >> >
> > > >> > any ideas?  Thanks.
> > > >> >
> > > >> >
> > > >> > On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <jun...@gmail.com>
> wrote:
> > > >> >
> > > >> > > Did you see any error in the producer log? Did the broker
> receive
> > > the
> > > >> > > produce request (you can look at the request log in the broker)?
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jun
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <lux...@gmail.com>
> > > wrote:
> > > >> > >
> > > >> > > > Let me post my test code here. I could see
> producer.send(data);
> > > >> > returned
> > > >> > > > with no error.
> > > >> > > >
> > > >> > > > public class TestProducer extends Thread {
> > > >> > > >     private final Producer<String, String> producer;
> > > >> > > >
> > > >> > > >     private final int m_events;
> > > >> > > >     private final int m_threadNumber;
> > > >> > > >
> > > >> > > >     private static String msg = StringUtils.rightPad("", 1000,
> > > '*');
> > > >> > > >
> > > >> > > >     public TestProducer(int threadNumber, int events) {
> > > >> > > >         m_threadNumber = threadNumber;
> > > >> > > >         m_events = events;
> > > >> > > >
> > > >> > > >         Properties props = new Properties();
> > > >> > > >         props.put("serializer.class",
> > > >> > > KafkaProperties.p_serializer_class);
> > > >> > > >         props.put("metadata.broker.list",
> > > >> > > > KafkaProperties.p_metadata_broker_list);
> > > >> > > >         props.put("partitioner.class",
> > > >> > > > KafkaProperties.p_partitioner_class);
> > > >> > > >         props.put("queue.enqueue.timeout.ms",
> > > >> > > > KafkaProperties.p_queue_enqueue_timeout);
> > > >> > > >         props.put("request.required.acks",
> > > >> > > > KafkaProperties.p_request_required_acks);
> > > >> > > >         props.put("producer.type",
> > > KafkaProperties.p_producer_type);
> > > >> > > >
> > > >> > > >         props.put("batch.num.messages",
> > > >> KafkaProperties.p_batch_num);
> > > >> > > >
> > > >> > > >         props.put("compression.codec",
> > > >> > > > KafkaProperties.p_compression_codec);
> > > >> > > >
> > > >> > > >         ProducerConfig config = new ProducerConfig(props);
> > > >> > > >         producer = new Producer<String, String>(config);
> > > >> > > >     }
> > > >> > > >
> > > >> > > >     @Override
> > > >> > > >     public void run() {
> > > >> > > >         long start;
> > > >> > > >         long num = 0;
> > > >> > > >         System.out.println(new Date() + " - Message sent
> thread
> > "
> > > +
> > > >> > > > m_threadNumber + " started.");
> > > >> > > >         while (true) {
> > > >> > > >             start = System.currentTimeMillis();
> > > >> > > >             String messageStr = new String(num + "_" + start);
> > > >> > > >             KeyedMessage<String, String> data = new
> > > >> > KeyedMessage<String,
> > > >> > > > String>(KafkaProperties.topic, messageStr,
> > > >> > > >                     start + "_" + msg);
> > > >> > > >             producer.send(data);
> > > >> > > >             num++;
> > > >> > > >             if (num == m_events) {
> > > >> > > >                 break;
> > > >> > > >             }
> > > >> > > >         }
> > > >> > > >         producer.close();
> > > >> > > >         System.out.println(new Date() + " - Message sent
> thread
> > "
> > > +
> > > >> > > > m_threadNumber + " end.   " + num
> > > >> > > >                 + " messages sent.");
> > > >> > > >     }
> > > >> > > > }
> > > >> > > >
> > > >> > > >
> > > >> > > > public interface KafkaProperties {
> > > >> > > >     final static String zookeeper_connect = "127.0.0.1:2181";
> > > >> > > >     final static String group_id = "group1";
> > > >> > > >     final static String topic = "topic1";
> > > >> > > >
> > > >> > > >     final static String p_serializer_class =
> > > >> > > > "kafka.serializer.StringEncoder";
> > > >> > > >     final static String p_metadata_broker_list = "
> > 127.0.0.1:9092
> > > ";
> > > >> > > >     final static String p_partitioner_class =
> > > >> > > > "kafka.producer.DefaultPartitioner";
> > > >> > > >
> > > >> > > >     final static String p_queue_enqueue_timeout = "-1";
> > > >> > > >     final static String p_request_required_acks = "1";
> > > >> > > >     final static String p_producer_type = "async";
> > > >> > > >     final static String p_batch_num = "100";
> > > >> > > >     final static String p_compression_codec = "1";
> > > >> > > >     final static String p_message_send_retries = "3";
> > > >> > > >     final static String p_retry_backoff_ms = "200";
> > > >> > > >     final static String p_topic_metadata_refresh = "600000";
> > > >> > > > }
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com
> >
> > > >> wrote:
> > > >> > > >
> > > >> > > > > Thanks Paul. Yes, I am using 0.8 beta1.  I followed your
> > > >> suggestion
> > > >> > to
> > > >> > > > set
> > > >> > > > > request.required.acks=1 and got the same result. No error
> > > message
> > > >> > seen
> > > >> > > in
> > > >> > > > > broker logs, the size of the partition files were after
> > sending
> > > >> > > 1,000,000
> > > >> > > > > events, the size of each event was 1KB :
> > > >> > > > >
> > > >> > > > > 00000000000000000000.index  10240 KB
> > > >> > > > > 00000000000000000000.log  0KB
> > > >> > > > >
> > > >> > > > > The broker configurations:
> > > >> > > > >
> > > >> > > > > num.partitions=5
> > > >> > > > > log.flush.interval.messages=20000
> > > >> > > > > log.flush.interval.ms=5000
> > > >> > > > >
> > > >> > > > > log.flush.scheduler.interval.ms=1000
> > > >> > > > > log.retention.hours=1
> > > >> > > > > log.segment.bytes=1073741824
> > > >> > > > > log.cleanup.interval.mins=30
> > > >> > > > >
> > > >> > > > > queued.max.requests=16
> > > >> > > > > fetch.purgatory.purge.interval.requests=100
> > > >> > > > > producer.purgatory.purge.interval.requests=100
> > > >> > > > >
> > > >> > > > > It works if I change the code to
> > props.put("compression.codec",
> > > >> "0");
> > > >> > > > >
> > > >> > > > > thanks,
> > > >> > > > > xlu
> > > >> > > > >
> > > >> > > > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <
> > > pmack...@adobe.com
> > > >> >
> > > >> > > > wrote:
> > > >> > > > >
> > > >> > > > >> I assume this is kafka 0.8, right? Are there any
> > corresponding
> > > >> > errors
> > > >> > > in
> > > >> > > > >> the broker logs? With the configuration below, I don't
> think
> > > any
> > > >> > > errors
> > > >> > > > >> will be reported back to the producer.
> > > >> > > > >>
> > > >> > > > >> You could also try setting erquest.required.acks=1 to see
> if
> > > >> errors
> > > >> > > are
> > > >> > > > >> reported back to the client.
> > > >> > > > >>
> > > >> > > > >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote:
> > > >> > > > >>
> > > >> > > > >> >Hi ,
> > > >> > > > >> >
> > > >> > > > >> >I am trying to enable gzip compression for my events. But
> > > after
> > > >> I
> > > >> > > > >> switched
> > > >> > > > >> >compression.codec to "1" I found the produced events were
> > even
> > > >> not
> > > >> > be
> > > >> > > > >> >persisted to disk log file. Of course, the consumer could
> > not
> > > >> > receive
> > > >> > > > any
> > > >> > > > >> >compressed events. I sent 10,000 or more events but the
> > > broker's
> > > >> > log
> > > >> > > > file
> > > >> > > > >> >not changed. Seems no events were actually send to broker?
> > > >> Below is
> > > >> > > my
> > > >> > > > >> >producer's code:
> > > >> > > > >> >
> > > >> > > > >> >        Properties props = new Properties();
> > > >> > > > >> >        props.put("serializer.class",
> > > >> > > > "kafka.serializer.StringEncoder");
> > > >> > > > >> >        props.put("metadata.broker.list", "127.0.0.1:9092
> > ");
> > > >> > > > >> >        props.put("partitioner.class",
> > > >> > > > >> >"kafka.producer.DefaultPartitioner");
> > > >> > > > >> >        props.put("queue.enqueue.timeout.ms", "-1");
> > > >> > > > >> >        props.put("request.required.acks", "0");
> > > >> > > > >> >        props.put("producer.type", "async");
> > > >> > > > >> >
> > > >> > > > >> >        props.put("batch.num.messages", "100");
> > > >> > > > >> >
> > > >> > > > >> >        props.put("compression.codec", "1");
> > > >> > > > >> >
> > > >> > > > >> >        ProducerConfig config = new ProducerConfig(props);
> > > >> > > > >> >        producer = new Producer<String, String>(config);
> > > >> > > > >> >
> > > >> > > > >> >        KeyedMessage<String, String> data = new
> > > >> > KeyedMessage<String,
> > > >> > > > >> >String>("topic1", messageStr, msg);
> > > >> > > > >> >        producer.send(data);
> > > >> > > > >> >
> > > >> > > > >> >
> > > >> > > > >> >If I comment out this line of code :
> > > >> props.put("compression.codec",
> > > >> > > > "1");
> > > >> > > > >> >then everything works fine. Did I miss something?
> > > >> > > > >> >
> > > >> > > > >> >thanks,
> > > >> > > > >> >xlu
> > > >> > > > >>
> > > >> > > > >>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to