In the internal network environment, bootstrap.server fills in floating IP, which can be used for normal production and consumption. After the local configuration of bootstrap.server as the public network IP and port on the server, my producer can produce very very very slowly, and consumers cannot consume it.
OS: SUSE 12 SP3 kafka node1 : 172.20.1.10 port:9092 hostname: kafka01 kafka node2 : 172.20.1.11 port:9092 hostname: kafka02 kafka node3 : 172.20.1.12 port:9092 hostname: kafka03 kafka VIP : 172.20.1.110 zk node1 : 172.20.1.13 zk node2 : 172.20.1.14 zk node3 : 172.20.1.15 kafka version: kafka_2.11-1.1.0 kafka server.properties: broker.id=0 advertised.listeners=PLAINTEXT://172.20.1.10:9092 (I tried to change advertised.listeners to public ip and port.But it's still not feasible ) (I tried to change advertised.listeners to kafka01 and port,and change my code to props.put("bootstrap.servers", "kafka01:20001"); then add "172.20.1.11 kafka01" to /etc/hosts, add "$PUBLICIP kafka01" to C:\Windows\System32\drivers\etc\hosts. But it's still not feasible) num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kafkalogs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.20.1.13:2181,172.20.1.13:2181,172.20.1.13:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true this is my code, Port 20001 of the public network IP is mapped to port 9092 of the internal network IP. (By VMware vCloud Director. we can't telnet public ip and port on kafka01/02/03) Producer: public class MyProducer extends Thread { private String topic; private static Producer producer; private int i; public MyProducer(String topic) { this.topic = topic; } @Override public void run() { while (true) { producer.send(new ProducerRecord(topic, i++ + "")); System.out.println("create :" + i); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } private static Producer createProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "xx.xx.xx.xx:20001"); // public ip () // props.put("bootstrap.servers", "172.20.1.110:9092"); //private ip (floating ip/VIP) props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer(props); } public static void main(String[] args) { if (args.length < 1) { System.out.println("need topicName"); } else { producer = createProducer(); new MyProducer(args[0]).start(); } } } Consumer: public class MyCustomer { public static void main(String[] args) { if (args.length < 2) { System.out.println("java -jar $JAR_NAME.jar $group.id $topicName"); } else { System.out.println(args[0]+" "+args[1]); Properties props = new Properties(); // props.put("bootstrap.servers", "172.21.1.110:9092"); props.put("bootstrap.servers", "xx.xx.xx.xx:20001"); props.put("group.id", args[0]+""); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(args[1])); while (true) { // System.out.println("poll数据"); ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }