I hate email forums... mess up your code! Ok, so I finally managed to get it to run past the first one. Not sure what I did... just kept trying. Seems a bit flakey. When it ran all 6 it did 6 sends() then waited 5 seconds sleep I put in there before the close. Then I saw 6 of these: java.lang.IllegalStateException: Producer is closed forcefully. at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:487) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:467) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:155) at java.lang.Thread.run(Thread.java:745)The offset of the record we just sent is: null So something is hanging up and then gets tied in knots when things finally shut down. From: Greg Zoller <gwzol...@yahoo.com.INVALID> To: "users@kafka.apache.org" <users@kafka.apache.org> Sent: Wednesday, April 13, 2016 9:08 PM Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing I did and now it hangs on the first send. Here's my output: (hopefully won't destroy formatting).The "::: Writing 1 :::" should output a line like that for each of 6 writes, but only the first is called, then hangs until timeout. [pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [192.168.99.100:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null max.block.ms = 60000 interceptor.classes = null sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 client.id = producer-1 ssl.endpoint.identification.algorithm = null ssl.protocol = TLS request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] acks = 1 batch.size = 16384 ssl.keystore.location = null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT retries = 0 max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.StringSerializer ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 linger.ms = 0 [pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.1.0-SNAPSHOT[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 319c6e7195143c1f ::: Writing 1 ::::[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 0 : {lowercaseStrings=LEADER_NOT_AVAILABLE}org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 59860 ms.
From: Ismael Juma <ism...@juma.me.uk> To: users@kafka.apache.org; Greg Zoller <gwzol...@yahoo.com> Sent: Wednesday, April 13, 2016 5:54 PM Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing Hi Greg, It may be worth passing a callback to the `send` to see if there's any error (or calling `get` on the Future returned). Ismael On Wed, Apr 13, 2016 at 2:59 PM, Greg Zoller <gwzol...@yahoo.com.invalid> wrote: > So I rebuilt last night with the latest from master branch. > Unfortunately same problem--producer doesn't seem to commit/close(). > After inserting a few records (which *seem* to go ok), close() times out > and no offsets are updated--still set to 0. > Any ideas?Thanks,Greg > > From: Greg Zoller <gwzol...@yahoo.com.INVALID> > To: "users@kafka.apache.org" <users@kafka.apache.org>; Greg Zoller < > gwzol...@yahoo.com> > Sent: Tuesday, April 12, 2016 11:49 AM > Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing > > Sorry the formatting was all messed up.I re-tested this code with 0.9.0.1 > and it worked fine--KafkaProducer closed and committed the number of > records expected into the partitions. > So this seems like a SNAPSHOT issue. Will continue looking. > > From: Greg Zoller <gwzol...@yahoo.com.INVALID> > To: "users@kafka.apache.org" <users@kafka.apache.org> > Sent: Tuesday, April 12, 2016 10:40 AM > Subject: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing > > Hello, > I'm trying to run the latest master build in github. I've got producer > code like below: > val props = Map( "bootstrap.servers" -> host, > "key.serializer" -> > "org.apache.kafka.common.serialization.ByteArraySerializer", > "value.serializer" -> > "org.apache.kafka.common.serialization.StringSerializer" ) val p = > new KafkaProducer[Array[Byte], String](props) (1 to num).foreach { i => > p.send(new ProducerRecord[Array[Byte], String](topic, s"msg-$i")) } > p.close(10000, java.util.concurrent.TimeUnit.MILLISECONDS) > > > This code will wait 10 seconds then close the KafkaProducer. At that > point if I check the offsets in my server (same build) I see all offsets > set to 0...no data committed. If I put some kind of println in my loop I > see the p.send() call seeming to work happily. > Any ideas? > Thanks,Greg > > > > >