Hi Jason,

I'm so sorry to reply your email later.
Your solution solved my issue. When I use "earliest" for auto.offset.reset,
the consumer can receiver messages.


Thanks a lot,
Nicole

On Thu, May 26, 2016 at 11:21 AM, Jason Gustafson <ja...@confluent.io>
wrote:

> Hi Nicole,
>
> It would help to see some log files. I cannot view the image you've tried
> to include above. I took a quick glance at your code and noticed that your
> producer only writes 50 messages before it stops. Since you have not
> overridden auto.offset.reset in the consumer config, the default behavior
> will be to start the consumer at the latest offset. My guess is that the
> producer might be finishing all of its writes before the consumer sets its
> initial offset. In that case, the consumer will start at offset 50 and
> won't receive anything because nothing further is written. Try using
> "earliest" for auto.offset.reset and see if that works. If not, attach some
> logs and we'll try to help.
>
> Thanks,
> Jason
>
> On Wed, May 25, 2016 at 6:37 PM, Shaolu Xu <sh...@tibco-support.com>
> wrote:
>
>> Added the project.
>>
>> On Thu, May 26, 2016 at 9:25 AM, Shaolu Xu <sh...@tibco-support.com>
>> wrote:
>>
>>> Hi Tom,
>>>
>>> The following is my producer and consumer configuration:
>>>
>>>    - Producer:
>>>
>>>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>>> 127.0.0.1:9090");
>>>                    props.put("key.serializer",
>>> "org.apache.kafka.common.serialization.IntegerSerializer");
>>>                    props.put("value.serializer",
>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>                    props.put("batch.size",16384)
>>>                    props.put("linger.ms",10);
>>>                    props.put("compression.type","snappy");
>>>                    KafkaProducer<Integer, String> producer = new
>>> KafkaProducer<Integer, String>(props);
>>>
>>>    - Consumer:
>>>
>>>                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "
>>> 127.0.0.1:9090");
>>>                    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
>>>                    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>>> "true");
>>>
>>>  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
>>>                    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
>>> "30000");
>>>
>>>  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>>> "org.apache.kafka.common.serialization.IntegerDeserializer");
>>>
>>>  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>>> "org.apache.kafka.common.serialization.StringDeserializer");
>>>                    KafkaConsumer<Integer, String> consumer = new
>>> KafkaConsumer<>(props);
>>>
>>> This is the consumer debug snapshot:
>>>
>>> [image: Inline image 1]
>>> And the node ip is correct.
>>> Also attachment my project. Thanks in advance.
>>>
>>> Thanks,
>>> Nicole
>>>
>>> On Wed, May 25, 2016 at 6:24 PM, Tom Crayford <tcrayf...@heroku.com>
>>> wrote:
>>>
>>>> Which consumer are you using? Can you see it connecting to the broker in
>>>> the broker logs? I'd recommend putting your configs for producer,
>>>> consumer
>>>> and broker in a reply to assist debugging. Also please attach any
>>>> relevant
>>>> code or log files.
>>>>
>>>> Thanks
>>>>
>>>> Tom Crayford
>>>> Heroku Kafka
>>>>
>>>> On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote:
>>>>
>>>> > Hi All,
>>>> >
>>>> > Anyone have idea about this, Please help me find the issue.
>>>> >
>>>> > Thanks,
>>>> > Nicole
>>>> >
>>>> > On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <sh...@tibco-support.com
>>>> > <javascript:;>> wrote:
>>>> >
>>>> > > Hi dev,
>>>> > >
>>>> > > Kafka version: 0.9.0  language:  Java
>>>> > >
>>>> > > When using kafka, I can set a codec by setting the*
>>>> > > compression.type=snappy *property of my kafka producer.
>>>> > >
>>>> > > Suppose I use snappy compression in my producer, and i can see the
>>>> > message
>>>> > > use kafkaMonitor. But when I consuming the messages from kafka using
>>>> > > consumer, I cannot receive any messages.
>>>> > > So should I do something to *decode the data from snappy or set some
>>>> > > configuration when create consumer*?
>>>> > >
>>>> > >
>>>> > > Thanks in advance.
>>>> > >
>>>> > >
>>>> > > Thanks,
>>>> > > Nicole
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to