Hi Jason, I'm so sorry to reply your email later. Your solution solved my issue. When I use "earliest" for auto.offset.reset, the consumer can receiver messages.
Thanks a lot, Nicole On Thu, May 26, 2016 at 11:21 AM, Jason Gustafson <ja...@confluent.io> wrote: > Hi Nicole, > > It would help to see some log files. I cannot view the image you've tried > to include above. I took a quick glance at your code and noticed that your > producer only writes 50 messages before it stops. Since you have not > overridden auto.offset.reset in the consumer config, the default behavior > will be to start the consumer at the latest offset. My guess is that the > producer might be finishing all of its writes before the consumer sets its > initial offset. In that case, the consumer will start at offset 50 and > won't receive anything because nothing further is written. Try using > "earliest" for auto.offset.reset and see if that works. If not, attach some > logs and we'll try to help. > > Thanks, > Jason > > On Wed, May 25, 2016 at 6:37 PM, Shaolu Xu <sh...@tibco-support.com> > wrote: > >> Added the project. >> >> On Thu, May 26, 2016 at 9:25 AM, Shaolu Xu <sh...@tibco-support.com> >> wrote: >> >>> Hi Tom, >>> >>> The following is my producer and consumer configuration: >>> >>> - Producer: >>> >>> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " >>> 127.0.0.1:9090"); >>> props.put("key.serializer", >>> "org.apache.kafka.common.serialization.IntegerSerializer"); >>> props.put("value.serializer", >>> "org.apache.kafka.common.serialization.StringSerializer"); >>> props.put("batch.size",16384) >>> props.put("linger.ms",10); >>> props.put("compression.type","snappy"); >>> KafkaProducer<Integer, String> producer = new >>> KafkaProducer<Integer, String>(props); >>> >>> - Consumer: >>> >>> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " >>> 127.0.0.1:9090"); >>> props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); >>> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >>> "true"); >>> >>> props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); >>> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, >>> "30000"); >>> >>> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>> "org.apache.kafka.common.serialization.IntegerDeserializer"); >>> >>> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>> "org.apache.kafka.common.serialization.StringDeserializer"); >>> KafkaConsumer<Integer, String> consumer = new >>> KafkaConsumer<>(props); >>> >>> This is the consumer debug snapshot: >>> >>> [image: Inline image 1] >>> And the node ip is correct. >>> Also attachment my project. Thanks in advance. >>> >>> Thanks, >>> Nicole >>> >>> On Wed, May 25, 2016 at 6:24 PM, Tom Crayford <tcrayf...@heroku.com> >>> wrote: >>> >>>> Which consumer are you using? Can you see it connecting to the broker in >>>> the broker logs? I'd recommend putting your configs for producer, >>>> consumer >>>> and broker in a reply to assist debugging. Also please attach any >>>> relevant >>>> code or log files. >>>> >>>> Thanks >>>> >>>> Tom Crayford >>>> Heroku Kafka >>>> >>>> On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote: >>>> >>>> > Hi All, >>>> > >>>> > Anyone have idea about this, Please help me find the issue. >>>> > >>>> > Thanks, >>>> > Nicole >>>> > >>>> > On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <sh...@tibco-support.com >>>> > <javascript:;>> wrote: >>>> > >>>> > > Hi dev, >>>> > > >>>> > > Kafka version: 0.9.0 language: Java >>>> > > >>>> > > When using kafka, I can set a codec by setting the* >>>> > > compression.type=snappy *property of my kafka producer. >>>> > > >>>> > > Suppose I use snappy compression in my producer, and i can see the >>>> > message >>>> > > use kafkaMonitor. But when I consuming the messages from kafka using >>>> > > consumer, I cannot receive any messages. >>>> > > So should I do something to *decode the data from snappy or set some >>>> > > configuration when create consumer*? >>>> > > >>>> > > >>>> > > Thanks in advance. >>>> > > >>>> > > >>>> > > Thanks, >>>> > > Nicole >>>> > > >>>> > >>>> >>> >>> >> >