Hi all, I am using kafka 0.10.0.1, and I set up my listeners like: listeners=PLAINTEXT://myhostName:9092
then I have one consumer going using the new api. However, I did not see anything return for the api. The log from kafka is: [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Preparing to restabilize group newconsumper with old generation 0 (kafka.coordinator.GroupCoordinator) [2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Stabilized group newconsumper generation 1 (kafka.coordinator.GroupCoordinator) [2016-08-26 14:39:28,555] INFO [GroupCoordinator 0]: Assignment received from leader for group newconsumper for generation 1 (kafka.coordinator.GroupCoordinator) [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Preparing to restabilize group newconsumper with old generation 1 (kafka.coordinator.GroupCoordinator) [2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Group newconsumper generation 1 is dead and removed (kafka.coordinator.GroupCoordinator) Here is the code: p.put("bootstrap.servers", config.getString("kafka.broker")) p.put("group.id", "newconsumper") p.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") p.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") p.put("enable.auto.commit", "true") p.put("heartbeat.interval.ms", "10000") p.put("session.timeout.ms", "30000"); val kafkaConsumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](p) kafkaConsumer.subscribe(util.Arrays.asList("test")) try { val timeout:Long = 1000 val records: ConsumerRecords[String, String] = kafkaConsumer.poll(timeout) records.asScala.foreach(record => { println(record.key() + ":" + record.value() + ":" + record.offset()) }) } catch { case e: Exception => { e.printStackTrace } } finally { kafkaConsumer.close } Best regards, Jack