Hi Jaikiran

Bellow is the stack trace. For completeness I see in my log file that my
code has called
producer.flush();

producer.close();



I get the following error, how ever I do not think this is the problem. I
found a ??bug report?? That said this was because I was connecting to a 0.8x
sever. I am able to consume my test messages using
kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh

Kind regards

Andy

ERROR 17:12:14 kafka-producer-network-thread | producer-1 o.a.k.c.p.i.Sender
run line:130 Uncaught error in kafka producer I/O thread:

org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'throttle_time_ms': java.nio.BufferUnderflowException

at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)

at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
.java:464)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)

at java.lang.Thread.run(Thread.java:745)



$ jstack 908

2016-05-20 10:16:25

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):



"Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x00007fe04291c800
nid=0x130b waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE



"kafka-producer-network-thread | producer-1" #11 daemon prio=5 os_prio=31
tid=0x00007fe041116800 nid=0x5a0f runnable [0x00007000015d5000]

   java.lang.Thread.State: RUNNABLE

at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)

at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)

at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)

at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

- locked <0x000000076b67ea88> (a sun.nio.ch.Util$2)

- locked <0x000000076b67ea00> (a java.util.Collections$UnmodifiableSet)

- locked <0x000000076b67e740> (a sun.nio.ch.KQueueSelectorImpl)

at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

at org.apache.kafka.common.network.Selector.select(Selector.java:425)

at org.apache.kafka.common.network.Selector.poll(Selector.java:254)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)

at java.lang.Thread.run(Thread.java:745)



"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fe042015800
nid=0x5203 runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE



"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fe04285b000
nid=0x5003 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE



"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fe04282f000
nid=0x4e03 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE



"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fe041830800
nid=0x4c03 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE



"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fe04201c800
nid=0x4a03 waiting on condition [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE



"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe042015000
nid=0x3e0f runnable [0x0000000000000000]

   java.lang.Thread.State: RUNNABLE



"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe04200d800 nid=0x3803 in
Object.wait() [0x0000700000d3a000]

   java.lang.Thread.State: WAITING (on object monitor)

at java.lang.Object.wait(Native Method)

- waiting on <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)

at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)

- locked <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)

at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)

at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)



"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe04200d000
nid=0x3603 in Object.wait() [0x0000700000c37000]

   java.lang.Thread.State: WAITING (on object monitor)

at java.lang.Object.wait(Native Method)

- waiting on <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)

at java.lang.Object.wait(Object.java:502)

at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)

- locked <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)



"main" #1 prio=5 os_prio=31 tid=0x00007fe041010800 nid=0x1703 waiting on
condition [0x0000700000219000]

   java.lang.Thread.State: WAITING (parking)

at sun.misc.Unsafe.park(Native Method)

- parking to wait for  <0x000000076b89c8e0> (a
java.util.concurrent.CountDownLatch$Sync)

at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)

at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(
AbstractQueuedSynchronizer.java:836)

at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterru
ptibly(AbstractQueuedSynchronizer.java:997)

at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterrupt
ibly(AbstractQueuedSynchronizer.java:1304)

at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)

at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(Produ
ceRequestResult.java:57)

at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushComp
letion(RecordAccumulator.java:422)

at 
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546
)

at com.pws.gnip.powertrack.HoseBirdClient.main(HoseBirdClient.java:56)



"VM Thread" os_prio=31 tid=0x00007fe04200a000 nid=0x3403 runnable



"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe042005800 nid=0x2403
runnable 



"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe042006000 nid=0x2603
runnable 



"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe042006800 nid=0x2803
runnable 



"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe042007000 nid=0x2a03
runnable 



"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fe042800800 nid=0x2c03
runnable 



"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fe042801000 nid=0x2e03
runnable 



"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fe042801800 nid=0x3003
runnable 



"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fe042802800 nid=0x3203
runnable 



"VM Periodic Task Thread" os_prio=31 tid=0x00007fe04202d800 nid=0x5403
waiting on condition



JNI global references: 341



From:  Jaikiran Pai <jai.forums2...@gmail.com>
Reply-To:  <users@kafka.apache.org>
Date:  Friday, May 20, 2016 at 7:55 AM
To:  <users@kafka.apache.org>
Subject:  Re: newbie: kafka 0.9.0.0 producer does not terminate after
producer.close()

> You can take a thread dump (using "jstack <pid-of-your-program>") when
> the program doesn't terminate and post that output here. That will tell
> us which threads are causing the program to not terminate.
> 
> -Jaikiran
> 
> On Tuesday 17 May 2016 11:32 PM, Andy Davidson wrote:
>>  I wrote a little test client that reads from a file an publishes using the
>>  0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able to send
>>  messages how ever I noticed that once I am done reading the input file my
>>  test program hangs
>> 
>>  Any idea what I am doing wrong?
>> 
>>  Kind regards
>> 
>>  Andy
>> 
>> 
>>  public static void main(String[] args) throws IOException {
>>  logger.warn("BEGIN");
>> 
>>  readFromFile(cmdLine, producer, topic);
>> 
>> 
>> 
>>  producer.flush();
>> 
>>  producer.close();
>> 
>> 
>> 
>>  logger.warn("END");
>> 
>>  }
>> 
>> 
>>  private static void readFromFile(CmdLine cmdLine, KafkaProducer<String,
>>  String> producer,
>> 
>>  String topic) throws IOException {
>> 
>> 
>> 
>>  logger.info("BEGIN");
>> 
>>  BufferedReader reader = cmdLine.getReader();
>> 
>>  String value = null;
>> 
>> 
>> 
>>  while ((value = reader.readLine()) != null) {
>> 
>>  logger.info("sending value: " + value);
>> 
>>  publish(producer, topic, value);
>> 
>>  }
>> 
>>  logger.info("END");
>> 
>>  }
>> 
>> 
>> 
>>  private static void publish(KafkaProducer<String, String> producer, String
>>  topic, String value) {
>> 
>>    Future<RecordMetadata> response = producer.send(new ProducerRecord<String,
>>  String>(topic, value));
>> 
>> 
>> 
>>  /* TODO
>> 
>>    send() will raise following error.
>> 
>>    It is because we are using a 0.9.0.0 client with an 0.8 server. The 0.8
>>  consumer seems
>> 
>>    to work with out problems
>> 
>>  }
>> 
>> 
>> 
>>  Š
>>  INFO  17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN
>> 
>>  Š
>>  INFO  17:02:54 main c.p.g.p.KClient readFromFile line:85 sending value:
>>  dependencies {
>> 
>>  Š
>>  INFO  17:02:54 main c.p.g.p.KClient readFromFile line:89 END
>> 
>>  Š
>> 
>>  The following error appears to be because we are using 0.9.0.0 api with an
>>  0.8.x sever. If I read from stdin instead of a file I would be able to
>>  continue sending messages. I do not think this is the reason my test code
>>  hangs.
>> 
>>  ERROR 17:02:54 kafka-producer-network-thread | producer-1 o.a.k.c.p.i.Sender
>>  run line:130 Uncaught error in kafka producer I/O thread:
>> 
>>  org.apache.kafka.common.protocol.types.SchemaException: Error reading field
>>  'throttle_time_ms': java.nio.BufferUnderflowException
>> 
>>  at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
>> 
>>  at
>>  org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
>>  .java:464)
>> 
>>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
>> 
>>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>> 
>>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>> 
>>  at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 


Reply via email to