No worries. I figure that out already. Thanks all. Best regards, Jack
-----Original Message----- From: Jack Yang [mailto:j...@uow.edu.au] Sent: Monday, 29 August 2016 10:13 AM To: users@kafka.apache.org Subject: RE: consumer with version 0.10.0 Hi there, My fault. When I produce messages, it starts to consume. Now my question is: 1. Is there a way to check the offset status for a new consumer? 2. for the new consumer, is that possible to force it to consumer messages given an earlier offset? For instance, in the old simple-level consumer, we can decide a previous offset, and then ask the consumer to start from it. How about the new one? Best regards, Jack -----Original Message----- From: Jaikiran Pai [mailto:jai.forums2...@gmail.com] Sent: Friday, 26 August 2016 11:30 PM To: users@kafka.apache.org Subject: Re: consumer with version 0.10.0 Is anyone producing any (new) messages to the topics you are subscribing to in that consumer? -Jaikiran On Friday 26 August 2016 10:14 AM, Jack Yang wrote: > Hi all, > I am using kafka 0.10.0.1, and I set up my listeners like: > > listeners=PLAINTEXT://myhostName:9092 > > then I have one consumer going using the new api. However, I did not see > anything return for the api. > The log from kafka is: > > [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Preparing to > restabilize group newconsumper with old generation 0 > (kafka.coordinator.GroupCoordinator) > [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Stabilized group > newconsumper generation 1 (kafka.coordinator.GroupCoordinator) > [2016-08-26 14:39:28,555] INFO [GroupCoordinator 0]: Assignment > received from leader for group newconsumper for generation 1 > (kafka.coordinator.GroupCoordinator) > [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Preparing to > restabilize group newconsumper with old generation 1 > (kafka.coordinator.GroupCoordinator) > [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Group > newconsumper generation 1 is dead and removed > (kafka.coordinator.GroupCoordinator) > > > Here is the code: > p.put("bootstrap.servers", config.getString("kafka.broker")) > p.put("group.id", "newconsumper") p.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer") > p.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer") > p.put("enable.auto.commit", "true") > p.put("heartbeat.interval.ms", "10000") p.put("session.timeout.ms", > "30000"); > > val kafkaConsumer: KafkaConsumer[String, String] = new > KafkaConsumer[String, String](p) > kafkaConsumer.subscribe(util.Arrays.asList("test")) > try { > val timeout:Long = 1000 > val records: ConsumerRecords[String, String] = > kafkaConsumer.poll(timeout) > > records.asScala.foreach(record => { > println(record.key() + ":" + record.value() + ":" + > record.offset()) > }) > > } catch { > case e: Exception => { > e.printStackTrace > } > } finally { > kafkaConsumer.close > } > > Best regards, > Jack > >