Hi, Does anyone publish or subscribe kafka topic in TestNG? I try to publish and subscribe kafka topic in my TestNG test case, and I always get the following exception:
2018-05-13 15:33:58.540 WARN o.a.kafka.common.network.Selector.pollSelectionKeys[531] - [Producer clientId=producer-1] Unexpected error from localhost/127.0.0.1; closing connection org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -2062548992) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:130) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495) at org.apache.kafka.common.network.Selector.poll(Selector.java:424) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) and this is how I publish a message to kafka topic: Map<String, Object> producerInfo = (Map<String, Object>) details.get("producer"); Properties props = new Properties(); props.put("bootstrap.servers", ""+details.get("bootstrapServers")); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props); int nrPartition = (int) producerInfo.get("nrPartition"); for(KeeperAccessRecord rec : records) { String msg = gson.toJson(rec); int partition = msg.hashCode() % nrPartition; producer.send(new ProducerRecord<String, String>(""+producerInfo.get("topic"), partition, "a", "a")); } producer.close(); Same code snippet works fine if I put it into a simple java program like the following: public class SimpleProducer { public static void main(String[] args) throws Exception{ String topicName = "MyTopic"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); for(int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); } System.out.println("Message sent successfully"); producer.close(); } } This is a UTF-8 formatted mail ----------------------------------------------- James C.-C.Yu +886988713275 +8615618429976