My fault. When I produce messages, it starts to consume.

Now my question is:

1. Is there a way to check the offset status for a new consumer?

2. for the new consumer, is that possible to force it to consumer messages 
given an earlier offset?
For instance, in the old simple-level consumer, we can decide a previous 
offset, and then ask the consumer to start from it.
How about the new one?

Is anyone producing any (new) messages to the topics you are subscribing to in 
that consumer?

> I am using kafka, and I set up my listeners like:
> listeners=PLAINTEXT://myhostName:9092
> then I have one consumer going using the new api. However, I did not see 
> anything return for the api.
> The log from kafka is:
> [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Preparing to 
> restabilize group newconsumper with old generation 0 
> (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Stabilized group 
> newconsumper generation 1 (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:28,555] INFO [GroupCoordinator 0]: Assignment 
> received from leader for group newconsumper for generation 1 
> (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Preparing to 
> restabilize group newconsumper with old generation 1 
> (kafka.coordinator.GroupCoordinator)
> [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Group 
> newconsumper generation 1 is dead and removed 
> (kafka.coordinator.GroupCoordinator)
> Here is the code:
> p.put("bootstrap.servers", config.getString("kafka.broker")) 
> p.put("group.id", "newconsumper") p.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer")
> p.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer")
> p.put("enable.auto.commit", "true")
> p.put("heartbeat.interval.ms", "10000") p.put("session.timeout.ms", 
> "30000");
>      val kafkaConsumer: KafkaConsumer[String, String] = new 
> KafkaConsumer[String, String](p)
>      kafkaConsumer.subscribe(util.Arrays.asList("test"))
>      try {
>        val timeout:Long = 1000
>        val records: ConsumerRecords[String, String] = 
> kafkaConsumer.poll(timeout)
>        records.asScala.foreach(record => {
>            println(record.key() + ":" + record.value() + ":" + 
> record.offset())
>            })
>      } catch {
>        case e: Exception => {
>          e.printStackTrace
>        }
>      } finally {
>        kafkaConsumer.close
>      }
