The issue is in zookeeper and kafka configuration Kafka server.proterties #advertised.host.name=10.179.165.7 # commnent at 20170621 #advertised.listeners=PLAINTEXT://0.0.0.0:9080 # commnent at 20170621 #port=9080 #comment at 20170621 listeners=PLAINTEXT://10.179.165.7:9080 #changed from 0.0.0.0 to 10.179.165.7 at20170621 delete.topic.enable=true #add at 20170621 zookeeper zoo.cfg server.0=10.179.165.7:2287:3387 #added at 20170621
发件人: Caokun (Jack, Platform) 发送时间: 2017年6月20日 23:07 收件人: 'users@kafka.apache.org' 抄送: Xukeke 主题: kafka version 0.10.2.1 consumer can not get the message Hello experts I write the kafka demo with java . The prouct can send the message but the consumer can not get the message My kafka configuration is ok ./kafka-console-producer.sh --broker-list localhost:9080 --topic testkun ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkun --from-beginning The following is the java code of consumer,product ,app ant interface Thakns a lot package com.huawei.business; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; public class hwKafkaConsumer extends Thread { private final String topic; public hwKafkaConsumer(final String topic) { this.topic = topic; } @Override public void run() { Properties props = new Properties(); props.put("bootstrap.servers", "10.179.165.7:9080"); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); props.put("enable.auto.commit", "true");// props.put("auto.commit.interval.ms", "1000");// props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("rebalance.backoff.ms", "2000"); props.put("rebalance.max.retries", "10"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest");//add to comsumer the earliest message Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(this.topic)); System.out.printf("consumer :the topic is "+this.topic+" "); while (true) { System.out.printf("consumer:recieve the message in while loop "); ConsumerRecords<String, String> records = consumer.poll(100);//seem to hang here System.out.printf(" consumer after create records "); for (ConsumerRecord<String, String> record : records) { System.out.printf("for loop to parse the message....."); System.out.printf("offset = %d, key = %s, value11111 = %s%n", record.offset(), record.key(), record.value()); } System.out.printf(" consumer after for loop "); } } } Any one can help to find the problem? Thanks a lot the product test is as following: package com.huawei.business; import java.util.Date; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; //import kafka.producer.KeyedMessage; //import kafka.producer.ProducerConfig; public class hwKafkaProducer extends Thread { //private final kafka.javaapi.producer.Producer<Integer, String> producer; private final String topic; // private final Properties props = new Properties(); public hwKafkaProducer(final String topic) { // props.put("serializer.class", "kafka.serializer.StringEncoder"); // props.put("metadata.broker.list", "10.179.165.7:9080"); // producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } @Override public void run() { int messageNo = 1; Properties props = new Properties(); props.put("bootstrap.servers", "10.179.165.7:9080"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "10.179.165.7:9080"); props.put("bootstrap.servers", "10.179.165.7:9080"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("group.id", KafkaProperties.groupId); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); // Producer<String, String> producer = new KafkaProducer<>(props); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i <1; i++){ System.out.printf("send the message:create record."); System.out.printf("send the message: the topic is "+this.topic); ProducerRecord<String, String> data = new ProducerRecord<String, String>(this.topic, Integer.toString(i),"helloworld_ni_hao"); System.out.printf("send the message:before send."); // producer.send(data); producer.send(data,new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { System.out.println("why why why?: " ); if(e != null) { e.printStackTrace(); } else { System.out.println("bbbb: " + metadata.offset()); System.out.println("cccc: " ); } } }); System.out.printf("send the message:the data.values is."+data.value()); System.out.printf("send the message:after send."); } producer.close(); /* while (true) { System.out.printf("send the message"); long runtime = new Date().getTime(); String ip = "123456"; String msg = "mymessage"; ProducerRecord<String, String> data = new ProducerRecord<String, String>(this.topic, ip, msg); producer.send(data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); } */ //producer.close(); } } The app is package com.huawei.business; public class app { public static void main(final String[] args) { final hwKafkaProducer producerThread = new hwKafkaProducer(KafkaProperties.topic); producerThread.start(); final hwKafkaConsumer consumerThread = new hwKafkaConsumer(KafkaProperties.topic); consumerThread.start(); } } The properties java file is package com.huawei.business; /** * Created by x00343661 on 2016/6/12. */ public interface KafkaProperties { final static String zkConnect = "10.179.165.7:2181"; final static String groupId = "group1"; final static String topic = "yqf"; final static String kafkaServerURL = "10.179.165.7"; final static int kafkaServerPort = 9080; final static int kafkaProducerBufferSize = 64 * 1024; final static int connectionTimeOut = 20000; final static int reconnectInterval = 10000; final static String topic2 = "topic2"; final static String topic3 = "topic3"; final static String clientId = "SimpleConsumerDemoClient"; }