Andy, Kafka 0.9.0 server supports the previous versions of the clients (0.8.2, 0.8.1..). But, new clients won't work properly with the older version of Kafka server.
You should upgrade your server / broker first. --Kamal On Fri, May 20, 2016 at 10:58 PM, Andy Davidson < a...@santacruzintegration.com> wrote: > Hi Jaikiran > > Bellow is the stack trace. For completeness I see in my log file that my > code has called > > producer.flush(); > > producer.close(); > > > > I get the following error, how ever I do not think this is the problem. I > found a ??bug report?? That said this was because I was connecting to a > 0.8x > sever. I am able to consume my test messages using > kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh > > Kind regards > > Andy > > ERROR 17:12:14 kafka-producer-network-thread | producer-1 > o.a.k.c.p.i.Sender > run line:130 Uncaught error in kafka producer I/O thread: > > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'throttle_time_ms': java.nio.BufferUnderflowException > > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > > at > > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient > .java:464) > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > > at java.lang.Thread.run(Thread.java:745) > > > > $ jstack 908 > > 2016-05-20 10:16:25 > > Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode): > > > > "Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x00007fe04291c800 > nid=0x130b waiting on condition [0x0000000000000000] > > java.lang.Thread.State: RUNNABLE > > > > "kafka-producer-network-thread | producer-1" #11 daemon prio=5 os_prio=31 > tid=0x00007fe041116800 nid=0x5a0f runnable [0x00007000015d5000] > > java.lang.Thread.State: RUNNABLE > > at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) > > at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) > > at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) > > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > > - locked <0x000000076b67ea88> (a sun.nio.ch.Util$2) > > - locked <0x000000076b67ea00> (a java.util.Collections$UnmodifiableSet) > > - locked <0x000000076b67e740> (a sun.nio.ch.KQueueSelectorImpl) > > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > > at org.apache.kafka.common.network.Selector.select(Selector.java:425) > > at org.apache.kafka.common.network.Selector.poll(Selector.java:254) > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > > at java.lang.Thread.run(Thread.java:745) > > > > "Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fe042015800 > nid=0x5203 runnable [0x0000000000000000] > > java.lang.Thread.State: RUNNABLE > > > > "C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fe04285b000 > nid=0x5003 waiting on condition [0x0000000000000000] > > java.lang.Thread.State: RUNNABLE > > > > "C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fe04282f000 > nid=0x4e03 waiting on condition [0x0000000000000000] > > java.lang.Thread.State: RUNNABLE > > > > "C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fe041830800 > nid=0x4c03 waiting on condition [0x0000000000000000] > > java.lang.Thread.State: RUNNABLE > > > > "C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fe04201c800 > nid=0x4a03 waiting on condition [0x0000000000000000] > > java.lang.Thread.State: RUNNABLE > > > > "Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe042015000 > nid=0x3e0f runnable [0x0000000000000000] > > java.lang.Thread.State: RUNNABLE > > > > "Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe04200d800 nid=0x3803 > in > Object.wait() [0x0000700000d3a000] > > java.lang.Thread.State: WAITING (on object monitor) > > at java.lang.Object.wait(Native Method) > > - waiting on <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock) > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) > > - locked <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock) > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) > > at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) > > > > "Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe04200d000 > nid=0x3603 in Object.wait() [0x0000700000c37000] > > java.lang.Thread.State: WAITING (on object monitor) > > at java.lang.Object.wait(Native Method) > > - waiting on <0x000000076ab06af8> (a java.lang.ref.Reference$Lock) > > at java.lang.Object.wait(Object.java:502) > > at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) > > - locked <0x000000076ab06af8> (a java.lang.ref.Reference$Lock) > > > > "main" #1 prio=5 os_prio=31 tid=0x00007fe041010800 nid=0x1703 waiting on > condition [0x0000700000219000] > > java.lang.Thread.State: WAITING (parking) > > at sun.misc.Unsafe.park(Native Method) > > - parking to wait for <0x000000076b89c8e0> (a > java.util.concurrent.CountDownLatch$Sync) > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt( > AbstractQueuedSynchronizer.java:836) > > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterru > ptibly(AbstractQueuedSynchronizer.java:997) > > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterrupt > ibly(AbstractQueuedSynchronizer.java:1304) > > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > > at > > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(Produ > ceRequestResult.java:57) > > at > > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushComp > letion(RecordAccumulator.java:422) > > at > > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546 > ) > > at com.pws.gnip.powertrack.HoseBirdClient.main(HoseBirdClient.java:56) > > > > "VM Thread" os_prio=31 tid=0x00007fe04200a000 nid=0x3403 runnable > > > > "GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe042005800 > nid=0x2403 > runnable > > > > "GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe042006000 > nid=0x2603 > runnable > > > > "GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe042006800 > nid=0x2803 > runnable > > > > "GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe042007000 > nid=0x2a03 > runnable > > > > "GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fe042800800 > nid=0x2c03 > runnable > > > > "GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fe042801000 > nid=0x2e03 > runnable > > > > "GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fe042801800 > nid=0x3003 > runnable > > > > "GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fe042802800 > nid=0x3203 > runnable > > > > "VM Periodic Task Thread" os_prio=31 tid=0x00007fe04202d800 nid=0x5403 > waiting on condition > > > > JNI global references: 341 > > > > From: Jaikiran Pai <jai.forums2...@gmail.com> > Reply-To: <users@kafka.apache.org> > Date: Friday, May 20, 2016 at 7:55 AM > To: <users@kafka.apache.org> > Subject: Re: newbie: kafka 0.9.0.0 producer does not terminate after > producer.close() > > > You can take a thread dump (using "jstack <pid-of-your-program>") when > > the program doesn't terminate and post that output here. That will tell > > us which threads are causing the program to not terminate. > > > > -Jaikiran > > > > On Tuesday 17 May 2016 11:32 PM, Andy Davidson wrote: > >> I wrote a little test client that reads from a file an publishes using > the > >> 0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able to send > >> messages how ever I noticed that once I am done reading the input file > my > >> test program hangs > >> > >> Any idea what I am doing wrong? > >> > >> Kind regards > >> > >> Andy > >> > >> > >> public static void main(String[] args) throws IOException { > >> logger.warn("BEGIN"); > >> > >> readFromFile(cmdLine, producer, topic); > >> > >> > >> > >> producer.flush(); > >> > >> producer.close(); > >> > >> > >> > >> logger.warn("END"); > >> > >> } > >> > >> > >> private static void readFromFile(CmdLine cmdLine, KafkaProducer<String, > >> String> producer, > >> > >> String topic) throws IOException { > >> > >> > >> > >> logger.info("BEGIN"); > >> > >> BufferedReader reader = cmdLine.getReader(); > >> > >> String value = null; > >> > >> > >> > >> while ((value = reader.readLine()) != null) { > >> > >> logger.info("sending value: " + value); > >> > >> publish(producer, topic, value); > >> > >> } > >> > >> logger.info("END"); > >> > >> } > >> > >> > >> > >> private static void publish(KafkaProducer<String, String> producer, > String > >> topic, String value) { > >> > >> Future<RecordMetadata> response = producer.send(new > ProducerRecord<String, > >> String>(topic, value)); > >> > >> > >> > >> /* TODO > >> > >> send() will raise following error. > >> > >> It is because we are using a 0.9.0.0 client with an 0.8 server. The > 0.8 > >> consumer seems > >> > >> to work with out problems > >> > >> } > >> > >> > >> > >> Š > >> INFO 17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN > >> > >> Š > >> INFO 17:02:54 main c.p.g.p.KClient readFromFile line:85 sending value: > >> dependencies { > >> > >> Š > >> INFO 17:02:54 main c.p.g.p.KClient readFromFile line:89 END > >> > >> Š > >> > >> The following error appears to be because we are using 0.9.0.0 api > with an > >> 0.8.x sever. If I read from stdin instead of a file I would be able to > >> continue sending messages. I do not think this is the reason my test > code > >> hangs. > >> > >> ERROR 17:02:54 kafka-producer-network-thread | producer-1 > o.a.k.c.p.i.Sender > >> run line:130 Uncaught error in kafka producer I/O thread: > >> > >> org.apache.kafka.common.protocol.types.SchemaException: Error reading > field > >> 'throttle_time_ms': java.nio.BufferUnderflowException > >> > >> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > >> > >> at > >> > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient > >> .java:464) > >> > >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) > >> > >> at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > >> > >> at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > >> > >> at java.lang.Thread.run(Thread.java:745) > >> > >> > >> > >> > >> > >> > >> > > > > > > >