Let me post my test code here. I could see producer.send(data); returned
with no error.

public class TestProducer extends Thread {
    private final Producer<String, String> producer;

    private final int m_events;
    private final int m_threadNumber;

    private static String msg = StringUtils.rightPad("", 1000, '*');

    public TestProducer(int threadNumber, int events) {
        m_threadNumber = threadNumber;
        m_events = events;

        Properties props = new Properties();
        props.put("serializer.class", KafkaProperties.p_serializer_class);
        props.put("metadata.broker.list",
KafkaProperties.p_metadata_broker_list);
        props.put("partitioner.class", KafkaProperties.p_partitioner_class);
        props.put("queue.enqueue.timeout.ms",
KafkaProperties.p_queue_enqueue_timeout);
        props.put("request.required.acks",
KafkaProperties.p_request_required_acks);
        props.put("producer.type", KafkaProperties.p_producer_type);

        props.put("batch.num.messages", KafkaProperties.p_batch_num);

        props.put("compression.codec", KafkaProperties.p_compression_codec);

        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer<String, String>(config);
    }

    @Override
    public void run() {
        long start;
        long num = 0;
        System.out.println(new Date() + " - Message sent thread " +
m_threadNumber + " started.");
        while (true) {
            start = System.currentTimeMillis();
            String messageStr = new String(num + "_" + start);
            KeyedMessage<String, String> data = new KeyedMessage<String,
String>(KafkaProperties.topic, messageStr,
                    start + "_" + msg);
            producer.send(data);
            num++;
            if (num == m_events) {
                break;
            }
        }
        producer.close();
        System.out.println(new Date() + " - Message sent thread " +
m_threadNumber + " end.   " + num
                + " messages sent.");
    }
}


public interface KafkaProperties {
    final static String zookeeper_connect = "127.0.0.1:2181";
    final static String group_id = "group1";
    final static String topic = "topic1";

    final static String p_serializer_class =
"kafka.serializer.StringEncoder";
    final static String p_metadata_broker_list = "127.0.0.1:9092";
    final static String p_partitioner_class =
"kafka.producer.DefaultPartitioner";

    final static String p_queue_enqueue_timeout = "-1";
    final static String p_request_required_acks = "1";
    final static String p_producer_type = "async";
    final static String p_batch_num = "100";
    final static String p_compression_codec = "1";
    final static String p_message_send_retries = "3";
    final static String p_retry_backoff_ms = "200";
    final static String p_topic_metadata_refresh = "600000";
}


On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com> wrote:

> Thanks Paul. Yes, I am using 0.8 beta1.  I followed your suggestion to set
> request.required.acks=1 and got the same result. No error message seen in
> broker logs, the size of the partition files were after sending 1,000,000
> events, the size of each event was 1KB :
>
> 00000000000000000000.index  10240 KB
> 00000000000000000000.log  0KB
>
> The broker configurations:
>
> num.partitions=5
> log.flush.interval.messages=20000
> log.flush.interval.ms=5000
>
> log.flush.scheduler.interval.ms=1000
> log.retention.hours=1
> log.segment.bytes=1073741824
> log.cleanup.interval.mins=30
>
> queued.max.requests=16
> fetch.purgatory.purge.interval.requests=100
> producer.purgatory.purge.interval.requests=100
>
> It works if I change the code to props.put("compression.codec", "0");
>
> thanks,
> xlu
>
> On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <pmack...@adobe.com> wrote:
>
>> I assume this is kafka 0.8, right? Are there any corresponding errors in
>> the broker logs? With the configuration below, I don't think any errors
>> will be reported back to the producer.
>>
>> You could also try setting erquest.required.acks=1 to see if errors are
>> reported back to the client.
>>
>> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote:
>>
>> >Hi ,
>> >
>> >I am trying to enable gzip compression for my events. But after I
>> switched
>> >compression.codec to "1" I found the produced events were even not be
>> >persisted to disk log file. Of course, the consumer could not receive any
>> >compressed events. I sent 10,000 or more events but the broker's log file
>> >not changed. Seems no events were actually send to broker? Below is my
>> >producer's code:
>> >
>> >        Properties props = new Properties();
>> >        props.put("serializer.class", "kafka.serializer.StringEncoder");
>> >        props.put("metadata.broker.list", "127.0.0.1:9092");
>> >        props.put("partitioner.class",
>> >"kafka.producer.DefaultPartitioner");
>> >        props.put("queue.enqueue.timeout.ms", "-1");
>> >        props.put("request.required.acks", "0");
>> >        props.put("producer.type", "async");
>> >
>> >        props.put("batch.num.messages", "100");
>> >
>> >        props.put("compression.codec", "1");
>> >
>> >        ProducerConfig config = new ProducerConfig(props);
>> >        producer = new Producer<String, String>(config);
>> >
>> >        KeyedMessage<String, String> data = new KeyedMessage<String,
>> >String>("topic1", messageStr, msg);
>> >        producer.send(data);
>> >
>> >
>> >If I comment out this line of code : props.put("compression.codec", "1");
>> >then everything works fine. Did I miss something?
>> >
>> >thanks,
>> >xlu
>>
>>
>

Reply via email to