Hi Jaikiran Bellow is the stack trace. For completeness I see in my log file that my code has called
producer.flush(); producer.close(); I get the following error, how ever I do not think this is the problem. I found a ??bug report?? That said this was because I was connecting to a 0.8x sever. I am able to consume my test messages using kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh Kind regards Andy ERROR 17:12:14 kafka-producer-network-thread | producer-1 o.a.k.c.p.i.Sender run line:130 Uncaught error in kafka producer I/O thread: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient .java:464) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745) $ jstack 908 2016-05-20 10:16:25 Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode): "Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x00007fe04291c800 nid=0x130b waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "kafka-producer-network-thread | producer-1" #11 daemon prio=5 os_prio=31 tid=0x00007fe041116800 nid=0x5a0f runnable [0x00007000015d5000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x000000076b67ea88> (a sun.nio.ch.Util$2) - locked <0x000000076b67ea00> (a java.util.Collections$UnmodifiableSet) - locked <0x000000076b67e740> (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:425) at org.apache.kafka.common.network.Selector.poll(Selector.java:254) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745) "Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fe042015800 nid=0x5203 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fe04285b000 nid=0x5003 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fe04282f000 nid=0x4e03 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fe041830800 nid=0x4c03 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fe04201c800 nid=0x4a03 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe042015000 nid=0x3e0f runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe04200d800 nid=0x3803 in Object.wait() [0x0000700000d3a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) - locked <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) "Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe04200d000 nid=0x3603 in Object.wait() [0x0000700000c37000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x000000076ab06af8> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:502) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) - locked <0x000000076ab06af8> (a java.lang.ref.Reference$Lock) "main" #1 prio=5 os_prio=31 tid=0x00007fe041010800 nid=0x1703 waiting on condition [0x0000700000219000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076b89c8e0> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt( AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterru ptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterrupt ibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(Produ ceRequestResult.java:57) at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushComp letion(RecordAccumulator.java:422) at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546 ) at com.pws.gnip.powertrack.HoseBirdClient.main(HoseBirdClient.java:56) "VM Thread" os_prio=31 tid=0x00007fe04200a000 nid=0x3403 runnable "GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe042005800 nid=0x2403 runnable "GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe042006000 nid=0x2603 runnable "GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe042006800 nid=0x2803 runnable "GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe042007000 nid=0x2a03 runnable "GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fe042800800 nid=0x2c03 runnable "GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fe042801000 nid=0x2e03 runnable "GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fe042801800 nid=0x3003 runnable "GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fe042802800 nid=0x3203 runnable "VM Periodic Task Thread" os_prio=31 tid=0x00007fe04202d800 nid=0x5403 waiting on condition JNI global references: 341 From: Jaikiran Pai <jai.forums2...@gmail.com> Reply-To: <users@kafka.apache.org> Date: Friday, May 20, 2016 at 7:55 AM To: <users@kafka.apache.org> Subject: Re: newbie: kafka 0.9.0.0 producer does not terminate after producer.close() > You can take a thread dump (using "jstack <pid-of-your-program>") when > the program doesn't terminate and post that output here. That will tell > us which threads are causing the program to not terminate. > > -Jaikiran > > On Tuesday 17 May 2016 11:32 PM, Andy Davidson wrote: >> I wrote a little test client that reads from a file an publishes using the >> 0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able to send >> messages how ever I noticed that once I am done reading the input file my >> test program hangs >> >> Any idea what I am doing wrong? >> >> Kind regards >> >> Andy >> >> >> public static void main(String[] args) throws IOException { >> logger.warn("BEGIN"); >> >> readFromFile(cmdLine, producer, topic); >> >> >> >> producer.flush(); >> >> producer.close(); >> >> >> >> logger.warn("END"); >> >> } >> >> >> private static void readFromFile(CmdLine cmdLine, KafkaProducer<String, >> String> producer, >> >> String topic) throws IOException { >> >> >> >> logger.info("BEGIN"); >> >> BufferedReader reader = cmdLine.getReader(); >> >> String value = null; >> >> >> >> while ((value = reader.readLine()) != null) { >> >> logger.info("sending value: " + value); >> >> publish(producer, topic, value); >> >> } >> >> logger.info("END"); >> >> } >> >> >> >> private static void publish(KafkaProducer<String, String> producer, String >> topic, String value) { >> >> Future<RecordMetadata> response = producer.send(new ProducerRecord<String, >> String>(topic, value)); >> >> >> >> /* TODO >> >> send() will raise following error. >> >> It is because we are using a 0.9.0.0 client with an 0.8 server. The 0.8 >> consumer seems >> >> to work with out problems >> >> } >> >> >> >> Š >> INFO 17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN >> >> Š >> INFO 17:02:54 main c.p.g.p.KClient readFromFile line:85 sending value: >> dependencies { >> >> Š >> INFO 17:02:54 main c.p.g.p.KClient readFromFile line:89 END >> >> Š >> >> The following error appears to be because we are using 0.9.0.0 api with an >> 0.8.x sever. If I read from stdin instead of a file I would be able to >> continue sending messages. I do not think this is the reason my test code >> hangs. >> >> ERROR 17:02:54 kafka-producer-network-thread | producer-1 o.a.k.c.p.i.Sender >> run line:130 Uncaught error in kafka producer I/O thread: >> >> org.apache.kafka.common.protocol.types.SchemaException: Error reading field >> 'throttle_time_ms': java.nio.BufferUnderflowException >> >> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) >> >> at >> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient >> .java:464) >> >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) >> >> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) >> >> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> >> >> >> > >