No worries.
I figure that out already.
Thanks all.

Best regards,
Jack 



-----Original Message-----
From: Jack Yang [mailto:j...@uow.edu.au] 
Sent: Monday, 29 August 2016 10:13 AM
To: users@kafka.apache.org
Subject: RE: consumer with version 0.10.0

Hi there,

My fault. When I produce messages, it starts to consume.

Now my question is:

1. Is there a way to check the offset status for a new consumer?

2. for the new consumer, is that possible to force it to consumer messages 
given an earlier offset?
For instance, in the old simple-level consumer, we can decide a previous 
offset, and then ask the consumer to start from it.
How about the new one?


Best regards,
Jack 



-----Original Message-----
From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
Sent: Friday, 26 August 2016 11:30 PM
To: users@kafka.apache.org
Subject: Re: consumer with version 0.10.0

Is anyone producing any (new) messages to the topics you are subscribing to in 
that consumer?

-Jaikiran
On Friday 26 August 2016 10:14 AM, Jack Yang wrote:
> Hi all,
> I am using kafka 0.10.0.1, and I set up my listeners like:
>
> listeners=PLAINTEXT://myhostName:9092
>
> then I have one consumer going using the new api. However, I did not see 
> anything return for the api.
> The log from kafka is:
>
> [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Preparing to 
> restabilize group newconsumper with old generation 0
> (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Stabilized group 
> newconsumper generation 1 (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:28,555] INFO [GroupCoordinator 0]: Assignment 
> received from leader for group newconsumper for generation 1
> (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Preparing to 
> restabilize group newconsumper with old generation 1
> (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Group 
> newconsumper generation 1 is dead and removed
> (kafka.coordinator.GroupCoordinator)
>
>
> Here is the code:
> p.put("bootstrap.servers", config.getString("kafka.broker")) 
> p.put("group.id", "newconsumper") p.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer")
> p.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer")
> p.put("enable.auto.commit", "true")
> p.put("heartbeat.interval.ms", "10000") p.put("session.timeout.ms", 
> "30000");
>
>      val kafkaConsumer: KafkaConsumer[String, String] = new 
> KafkaConsumer[String, String](p)
>      kafkaConsumer.subscribe(util.Arrays.asList("test"))
>      try {
>        val timeout:Long = 1000
>        val records: ConsumerRecords[String, String] =
> kafkaConsumer.poll(timeout)
>
>        records.asScala.foreach(record => {
>            println(record.key() + ":" + record.value() + ":" + 
> record.offset())
>            })
>
>      } catch {
>        case e: Exception => {
>          e.printStackTrace
>        }
>      } finally {
>        kafkaConsumer.close
>      }
>
> Best regards,
> Jack
>
>

Reply via email to