Andy,

Kafka 0.9.0 server supports the previous versions of the clients (0.8.2,
0.8.1..).
But, new clients won't work properly with the older version of Kafka server.

You should upgrade your server / broker first.

--Kamal

On Fri, May 20, 2016 at 10:58 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Jaikiran
>
> Bellow is the stack trace. For completeness I see in my log file that my
> code has called
>
> producer.flush();
>
> producer.close();
>
>
>
> I get the following error, how ever I do not think this is the problem. I
> found a ??bug report?? That said this was because I was connecting to a
> 0.8x
> sever. I am able to consume my test messages using
> kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh
>
> Kind regards
>
> Andy
>
> ERROR 17:12:14 kafka-producer-network-thread | producer-1
> o.a.k.c.p.i.Sender
> run line:130 Uncaught error in kafka producer I/O thread:
>
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field
> 'throttle_time_ms': java.nio.BufferUnderflowException
>
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
>
> at
>
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
> .java:464)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> $ jstack 908
>
> 2016-05-20 10:16:25
>
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):
>
>
>
> "Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x00007fe04291c800
> nid=0x130b waiting on condition [0x0000000000000000]
>
>    java.lang.Thread.State: RUNNABLE
>
>
>
> "kafka-producer-network-thread | producer-1" #11 daemon prio=5 os_prio=31
> tid=0x00007fe041116800 nid=0x5a0f runnable [0x00007000015d5000]
>
>    java.lang.Thread.State: RUNNABLE
>
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
>
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
>
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>
> - locked <0x000000076b67ea88> (a sun.nio.ch.Util$2)
>
> - locked <0x000000076b67ea00> (a java.util.Collections$UnmodifiableSet)
>
> - locked <0x000000076b67e740> (a sun.nio.ch.KQueueSelectorImpl)
>
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>
> at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>
> at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> "Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fe042015800
> nid=0x5203 runnable [0x0000000000000000]
>
>    java.lang.Thread.State: RUNNABLE
>
>
>
> "C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fe04285b000
> nid=0x5003 waiting on condition [0x0000000000000000]
>
>    java.lang.Thread.State: RUNNABLE
>
>
>
> "C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fe04282f000
> nid=0x4e03 waiting on condition [0x0000000000000000]
>
>    java.lang.Thread.State: RUNNABLE
>
>
>
> "C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fe041830800
> nid=0x4c03 waiting on condition [0x0000000000000000]
>
>    java.lang.Thread.State: RUNNABLE
>
>
>
> "C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fe04201c800
> nid=0x4a03 waiting on condition [0x0000000000000000]
>
>    java.lang.Thread.State: RUNNABLE
>
>
>
> "Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe042015000
> nid=0x3e0f runnable [0x0000000000000000]
>
>    java.lang.Thread.State: RUNNABLE
>
>
>
> "Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe04200d800 nid=0x3803
> in
> Object.wait() [0x0000700000d3a000]
>
>    java.lang.Thread.State: WAITING (on object monitor)
>
> at java.lang.Object.wait(Native Method)
>
> - waiting on <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
>
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>
> - locked <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
>
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
>
> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>
>
>
> "Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe04200d000
> nid=0x3603 in Object.wait() [0x0000700000c37000]
>
>    java.lang.Thread.State: WAITING (on object monitor)
>
> at java.lang.Object.wait(Native Method)
>
> - waiting on <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)
>
> at java.lang.Object.wait(Object.java:502)
>
> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
>
> - locked <0x000000076ab06af8> (a java.lang.ref.Reference$Lock)
>
>
>
> "main" #1 prio=5 os_prio=31 tid=0x00007fe041010800 nid=0x1703 waiting on
> condition [0x0000700000219000]
>
>    java.lang.Thread.State: WAITING (parking)
>
> at sun.misc.Unsafe.park(Native Method)
>
> - parking to wait for  <0x000000076b89c8e0> (a
> java.util.concurrent.CountDownLatch$Sync)
>
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>
> at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(
> AbstractQueuedSynchronizer.java:836)
>
> at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterru
> ptibly(AbstractQueuedSynchronizer.java:997)
>
> at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterrupt
> ibly(AbstractQueuedSynchronizer.java:1304)
>
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>
> at
>
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(Produ
> ceRequestResult.java:57)
>
> at
>
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushComp
> letion(RecordAccumulator.java:422)
>
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546
> )
>
> at com.pws.gnip.powertrack.HoseBirdClient.main(HoseBirdClient.java:56)
>
>
>
> "VM Thread" os_prio=31 tid=0x00007fe04200a000 nid=0x3403 runnable
>
>
>
> "GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe042005800
> nid=0x2403
> runnable
>
>
>
> "GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe042006000
> nid=0x2603
> runnable
>
>
>
> "GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe042006800
> nid=0x2803
> runnable
>
>
>
> "GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe042007000
> nid=0x2a03
> runnable
>
>
>
> "GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fe042800800
> nid=0x2c03
> runnable
>
>
>
> "GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fe042801000
> nid=0x2e03
> runnable
>
>
>
> "GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fe042801800
> nid=0x3003
> runnable
>
>
>
> "GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fe042802800
> nid=0x3203
> runnable
>
>
>
> "VM Periodic Task Thread" os_prio=31 tid=0x00007fe04202d800 nid=0x5403
> waiting on condition
>
>
>
> JNI global references: 341
>
>
>
> From:  Jaikiran Pai <jai.forums2...@gmail.com>
> Reply-To:  <users@kafka.apache.org>
> Date:  Friday, May 20, 2016 at 7:55 AM
> To:  <users@kafka.apache.org>
> Subject:  Re: newbie: kafka 0.9.0.0 producer does not terminate after
> producer.close()
>
> > You can take a thread dump (using "jstack <pid-of-your-program>") when
> > the program doesn't terminate and post that output here. That will tell
> > us which threads are causing the program to not terminate.
> >
> > -Jaikiran
> >
> > On Tuesday 17 May 2016 11:32 PM, Andy Davidson wrote:
> >>  I wrote a little test client that reads from a file an publishes using
> the
> >>  0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able to send
> >>  messages how ever I noticed that once I am done reading the input file
> my
> >>  test program hangs
> >>
> >>  Any idea what I am doing wrong?
> >>
> >>  Kind regards
> >>
> >>  Andy
> >>
> >>
> >>  public static void main(String[] args) throws IOException {
> >>  logger.warn("BEGIN");
> >>
> >>  readFromFile(cmdLine, producer, topic);
> >>
> >>
> >>
> >>  producer.flush();
> >>
> >>  producer.close();
> >>
> >>
> >>
> >>  logger.warn("END");
> >>
> >>  }
> >>
> >>
> >>  private static void readFromFile(CmdLine cmdLine, KafkaProducer<String,
> >>  String> producer,
> >>
> >>  String topic) throws IOException {
> >>
> >>
> >>
> >>  logger.info("BEGIN");
> >>
> >>  BufferedReader reader = cmdLine.getReader();
> >>
> >>  String value = null;
> >>
> >>
> >>
> >>  while ((value = reader.readLine()) != null) {
> >>
> >>  logger.info("sending value: " + value);
> >>
> >>  publish(producer, topic, value);
> >>
> >>  }
> >>
> >>  logger.info("END");
> >>
> >>  }
> >>
> >>
> >>
> >>  private static void publish(KafkaProducer<String, String> producer,
> String
> >>  topic, String value) {
> >>
> >>    Future<RecordMetadata> response = producer.send(new
> ProducerRecord<String,
> >>  String>(topic, value));
> >>
> >>
> >>
> >>  /* TODO
> >>
> >>    send() will raise following error.
> >>
> >>    It is because we are using a 0.9.0.0 client with an 0.8 server. The
> 0.8
> >>  consumer seems
> >>
> >>    to work with out problems
> >>
> >>  }
> >>
> >>
> >>
> >>  Š
> >>  INFO  17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN
> >>
> >>  Š
> >>  INFO  17:02:54 main c.p.g.p.KClient readFromFile line:85 sending value:
> >>  dependencies {
> >>
> >>  Š
> >>  INFO  17:02:54 main c.p.g.p.KClient readFromFile line:89 END
> >>
> >>  Š
> >>
> >>  The following error appears to be because we are using 0.9.0.0 api
> with an
> >>  0.8.x sever. If I read from stdin instead of a file I would be able to
> >>  continue sending messages. I do not think this is the reason my test
> code
> >>  hangs.
> >>
> >>  ERROR 17:02:54 kafka-producer-network-thread | producer-1
> o.a.k.c.p.i.Sender
> >>  run line:130 Uncaught error in kafka producer I/O thread:
> >>
> >>  org.apache.kafka.common.protocol.types.SchemaException: Error reading
> field
> >>  'throttle_time_ms': java.nio.BufferUnderflowException
> >>
> >>  at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
> >>
> >>  at
> >>
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
> >>  .java:464)
> >>
> >>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
> >>
> >>  at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> >>
> >>  at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> >>
> >>  at java.lang.Thread.run(Thread.java:745)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >
> >
>
>
>

Reply via email to