Hi all,
I am using kafka 0.10.0.1, and I set up my listeners like:

listeners=PLAINTEXT://myhostName:9092

then I have one consumer going using the new api. However, I did not see 
anything return for the api.
The log from kafka is:

[2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Preparing to restabilize 
group newconsumper with old generation 0 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Stabilized group 
newconsumper generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:28,555] INFO [GroupCoordinator 0]: Assignment received from 
leader for group newconsumper for generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Preparing to restabilize 
group newconsumper with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Group newconsumper 
generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)


Here is the code:
p.put("bootstrap.servers", config.getString("kafka.broker"))
p.put("group.id", "newconsumper")
p.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
p.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
p.put("enable.auto.commit", "true")
p.put("heartbeat.interval.ms", "10000")
p.put("session.timeout.ms", "30000");

    val kafkaConsumer: KafkaConsumer[String, String] = new 
KafkaConsumer[String, String](p)
    kafkaConsumer.subscribe(util.Arrays.asList("test"))
    try {
      val timeout:Long = 1000
      val records: ConsumerRecords[String, String] = kafkaConsumer.poll(timeout)

      records.asScala.foreach(record => {
          println(record.key() + ":" + record.value() + ":" + record.offset())
          })

    } catch {
      case e: Exception => {
        e.printStackTrace
      }
    } finally {
      kafkaConsumer.close
    }

Best regards,
Jack

Reply via email to