Hi Nicole, It would help to see some log files. I cannot view the image you've tried to include above. I took a quick glance at your code and noticed that your producer only writes 50 messages before it stops. Since you have not overridden auto.offset.reset in the consumer config, the default behavior will be to start the consumer at the latest offset. My guess is that the producer might be finishing all of its writes before the consumer sets its initial offset. In that case, the consumer will start at offset 50 and won't receive anything because nothing further is written. Try using "earliest" for auto.offset.reset and see if that works. If not, attach some logs and we'll try to help.
Thanks, Jason On Wed, May 25, 2016 at 6:37 PM, Shaolu Xu <sh...@tibco-support.com> wrote: > Added the project. > > On Thu, May 26, 2016 at 9:25 AM, Shaolu Xu <sh...@tibco-support.com> > wrote: > >> Hi Tom, >> >> The following is my producer and consumer configuration: >> >> - Producer: >> >> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " >> 127.0.0.1:9090"); >> props.put("key.serializer", >> "org.apache.kafka.common.serialization.IntegerSerializer"); >> props.put("value.serializer", >> "org.apache.kafka.common.serialization.StringSerializer"); >> props.put("batch.size",16384) >> props.put("linger.ms",10); >> props.put("compression.type","snappy"); >> KafkaProducer<Integer, String> producer = new >> KafkaProducer<Integer, String>(props); >> >> - Consumer: >> >> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " >> 127.0.0.1:9090"); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); >> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >> "true"); >> >> props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); >> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, >> "30000"); >> >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.IntegerDeserializer"); >> >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringDeserializer"); >> KafkaConsumer<Integer, String> consumer = new >> KafkaConsumer<>(props); >> >> This is the consumer debug snapshot: >> >> [image: Inline image 1] >> And the node ip is correct. >> Also attachment my project. Thanks in advance. >> >> Thanks, >> Nicole >> >> On Wed, May 25, 2016 at 6:24 PM, Tom Crayford <tcrayf...@heroku.com> >> wrote: >> >>> Which consumer are you using? Can you see it connecting to the broker in >>> the broker logs? I'd recommend putting your configs for producer, >>> consumer >>> and broker in a reply to assist debugging. Also please attach any >>> relevant >>> code or log files. >>> >>> Thanks >>> >>> Tom Crayford >>> Heroku Kafka >>> >>> On Wednesday, 25 May 2016, Shaolu Xu <sh...@tibco-support.com> wrote: >>> >>> > Hi All, >>> > >>> > Anyone have idea about this, Please help me find the issue. >>> > >>> > Thanks, >>> > Nicole >>> > >>> > On Wed, May 25, 2016 at 11:24 AM, Shaolu Xu <sh...@tibco-support.com >>> > <javascript:;>> wrote: >>> > >>> > > Hi dev, >>> > > >>> > > Kafka version: 0.9.0 language: Java >>> > > >>> > > When using kafka, I can set a codec by setting the* >>> > > compression.type=snappy *property of my kafka producer. >>> > > >>> > > Suppose I use snappy compression in my producer, and i can see the >>> > message >>> > > use kafkaMonitor. But when I consuming the messages from kafka using >>> > > consumer, I cannot receive any messages. >>> > > So should I do something to *decode the data from snappy or set some >>> > > configuration when create consumer*? >>> > > >>> > > >>> > > Thanks in advance. >>> > > >>> > > >>> > > Thanks, >>> > > Nicole >>> > > >>> > >>> >> >> >