It's quite possible that the bootstrap server being used in your test
case is different (since you pull it out of some "details") from the one
being used in the standalone Java program. I don't mean the IP address
(since the logs do indicate it is localhost), but I think it might be
the port. Perhaps the test case uses a SSL backed port?
-Jaikiran
On 13/05/18 1:46 PM, James Yu wrote:
Hi,
Does anyone publish or subscribe kafka topic in TestNG?
I try to publish and subscribe kafka topic in my TestNG test case, and I
always get the following exception:
2018-05-13 15:33:58.540 WARN
o.a.kafka.common.network.Selector.pollSelectionKeys[531] - [Producer
clientId=producer-1] Unexpected error from localhost/127.0.0.1; closing
connection
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
(size = -2062548992)
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:130)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495)
at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
and this is how I publish a message to kafka topic:
Map<String, Object> producerInfo = (Map<String, Object>)
details.get("producer");
Properties props = new Properties();
props.put("bootstrap.servers", ""+details.get("bootstrapServers"));
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new
org.apache.kafka.clients.producer.KafkaProducer<String,
String>(props);
int nrPartition = (int) producerInfo.get("nrPartition");
for(KeeperAccessRecord rec : records) {
String msg = gson.toJson(rec);
int partition = msg.hashCode() % nrPartition;
producer.send(new ProducerRecord<String,
String>(""+producerInfo.get("topic"), partition, "a", "a"));
}
producer.close();
Same code snippet works fine if I put it into a simple java program like
the following:
public class SimpleProducer {
public static void main(String[] args) throws Exception{
String topicName = "MyTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String,
String>(props);
for(int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
}
System.out.println("Message sent successfully");
producer.close();
}
}
This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976