I wrote a little test client that reads from a file an publishes using the 0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able to send messages how ever I noticed that once I am done reading the input file my test program hangs
Any idea what I am doing wrong? Kind regards Andy public static void main(String[] args) throws IOException { logger.warn("BEGIN"); readFromFile(cmdLine, producer, topic); producer.flush(); producer.close(); logger.warn("END"); } private static void readFromFile(CmdLine cmdLine, KafkaProducer<String, String> producer, String topic) throws IOException { logger.info("BEGIN"); BufferedReader reader = cmdLine.getReader(); String value = null; while ((value = reader.readLine()) != null) { logger.info("sending value: " + value); publish(producer, topic, value); } logger.info("END"); } private static void publish(KafkaProducer<String, String> producer, String topic, String value) { Future<RecordMetadata> response = producer.send(new ProducerRecord<String, String>(topic, value)); /* TODO send() will raise following error. It is because we are using a 0.9.0.0 client with an 0.8 server. The 0.8 consumer seems to work with out problems } INFO 17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN INFO 17:02:54 main c.p.g.p.KClient readFromFile line:85 sending value: dependencies { INFO 17:02:54 main c.p.g.p.KClient readFromFile line:89 END The following error appears to be because we are using 0.9.0.0 api with an 0.8.x sever. If I read from stdin instead of a file I would be able to continue sending messages. I do not think this is the reason my test code hangs. ERROR 17:02:54 kafka-producer-network-thread | producer-1 o.a.k.c.p.i.Sender run line:130 Uncaught error in kafka producer I/O thread: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient .java:464) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745)