I hate email forums... mess up your code!
Ok, so I finally managed to get it to run past the first one.  Not sure what I 
did... just kept trying.  Seems a bit flakey.
When it ran all 6 it did 6 sends() then waited 5 seconds sleep I put in there 
before the close.  Then I saw 6 of these:
java.lang.IllegalStateException: Producer is closed forcefully. at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:487)
 at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:467)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:155) at 
java.lang.Thread.run(Thread.java:745)The offset of the record we just sent is: 
null
So something is hanging up and then gets tied in knots when things finally shut 
down.
      From: Greg Zoller <gwzol...@yahoo.com.INVALID>
 To: "users@kafka.apache.org" <users@kafka.apache.org> 
 Sent: Wednesday, April 13, 2016 9:08 PM
 Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
   
I did and now it hangs on the first send.  Here's my output: (hopefully won't 
destroy formatting).The "::: Writing 1 :::" should output a line like that for 
each of 6 writes, but only the first is called, then hangs until timeout.
[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO 
org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 
metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 
sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = 
[192.168.99.100:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = 
/usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = 
class org.apache.kafka.common.serialization.ByteArraySerializer 
sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 
ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full 
= false ssl.key.password = null max.block.ms = 60000 interceptor.classes = null 
sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 
ssl.truststore.password = null max.in.flight.requests.per.connection = 5 
metrics.num.samples = 2 client.id = producer-1 
ssl.endpoint.identification.algorithm = null ssl.protocol = TLS 
request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = 
[TLSv1.2, TLSv1.1, TLSv1] acks = 1 batch.size = 16384 ssl.keystore.location = 
null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type 
= JKS security.protocol = PLAINTEXT retries = 0 max.request.size = 1048576 
value.serializer = class org.apache.kafka.common.serialization.StringSerializer 
ssl.truststore.location = null ssl.keystore.password = null 
ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner 
send.buffer.bytes = 131072 linger.ms = 0
[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO 
org.apache.kafka.common.utils.AppInfoParser - Kafka version : 
0.10.1.0-SNAPSHOT[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO 
org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 319c6e7195143c1f 
 ::: Writing 1 ::::[kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
correlation id 0 : 
{lowercaseStrings=LEADER_NOT_AVAILABLE}org.apache.kafka.common.errors.TimeoutException:
 Failed to update metadata after 59860 ms.




      From: Ismael Juma <ism...@juma.me.uk>
 To: users@kafka.apache.org; Greg Zoller <gwzol...@yahoo.com> 
 Sent: Wednesday, April 13, 2016 5:54 PM
 Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
  
Hi Greg,

It may be worth passing a callback to the `send` to see if there's any
error (or calling `get` on the Future returned).

Ismael

On Wed, Apr 13, 2016 at 2:59 PM, Greg Zoller <gwzol...@yahoo.com.invalid>
wrote:

> So I rebuilt last night with the latest from master branch.
> Unfortunately same problem--producer doesn't seem to commit/close().
> After inserting a few records (which *seem* to go ok), close() times out
> and no offsets are updated--still set to 0.
> Any ideas?Thanks,Greg
>
>      From: Greg Zoller <gwzol...@yahoo.com.INVALID>
>  To: "users@kafka.apache.org" <users@kafka.apache.org>; Greg Zoller <
> gwzol...@yahoo.com>
>  Sent: Tuesday, April 12, 2016 11:49 AM
>  Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
>
> Sorry the formatting was all messed up.I re-tested this code with 0.9.0.1
> and it worked fine--KafkaProducer closed and committed the number of
> records expected into the partitions.
> So this seems like a SNAPSHOT issue.  Will continue looking.
>
>      From: Greg Zoller <gwzol...@yahoo.com.INVALID>
>  To: "users@kafka.apache.org" <users@kafka.apache.org>
>  Sent: Tuesday, April 12, 2016 10:40 AM
>  Subject: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
>
> Hello,
> I'm trying to run the latest master build in github.  I've got producer
> code like below:
>    val props = Map(      "bootstrap.servers" -> host,
> "key.serializer" ->
> "org.apache.kafka.common.serialization.ByteArraySerializer",
> "value.serializer" ->
> "org.apache.kafka.common.serialization.StringSerializer"    )    val p =
> new KafkaProducer[Array[Byte], String](props)    (1 to num).foreach { i =>
>    p.send(new ProducerRecord[Array[Byte], String](topic, s"msg-$i"))    }
>  p.close(10000, java.util.concurrent.TimeUnit.MILLISECONDS)
>
>
> This code will wait 10 seconds then close the KafkaProducer.  At that
> point if I check the offsets in my server (same build) I see all offsets
> set to 0...no data committed.  If I put some kind of println in my loop I
> see the p.send() call seeming to work happily.
> Any ideas?
> Thanks,Greg
>
>
>
>
>




  

Reply via email to