Hi,

Does anyone publish or subscribe kafka topic in TestNG?
I try to publish and subscribe kafka topic in my TestNG test case, and  I
always get the following exception:

2018-05-13 15:33:58.540 WARN
o.a.kafka.common.network.Selector.pollSelectionKeys[531] - [Producer
clientId=producer-1] Unexpected error from localhost/127.0.0.1; closing
connection
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
(size = -2062548992)
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:130)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495)
at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)

and this is how I publish a message to kafka topic:

Map<String, Object> producerInfo = (Map<String, Object>)
details.get("producer");
Properties props = new Properties();
props.put("bootstrap.servers", ""+details.get("bootstrapServers"));
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new
org.apache.kafka.clients.producer.KafkaProducer<String,
String>(props);

int nrPartition = (int) producerInfo.get("nrPartition");
for(KeeperAccessRecord rec : records) {
 String msg = gson.toJson(rec);
 int partition = msg.hashCode() % nrPartition;
 producer.send(new ProducerRecord<String,
String>(""+producerInfo.get("topic"), partition, "a", "a"));
}
producer.close();


Same code snippet works fine if I put it into a simple java program like
the following:

public class SimpleProducer {
  public static void main(String[] args) throws Exception{
    String topicName = "MyTopic";
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    Producer<String, String> producer = new KafkaProducer<String,
String>(props);
    for(int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
    }
    System.out.println("Message sent successfully");
    producer.close();
  }
}



This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976

Reply via email to