I hate email forums... mess up your code!
Ok, so I finally managed to get it to run past the first one. Not sure what I
did... just kept trying. Seems a bit flakey.
When it ran all 6 it did 6 sends() then waited 5 seconds sleep I put in there
before the close. Then I saw 6 of these:
java.lang.IllegalStateException: Producer is closed forcefully. at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:487)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:467)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:155) at
java.lang.Thread.run(Thread.java:745)The offset of the record we just sent is:
null
So something is hanging up and then gets tied in knots when things finally shut
down.
From: Greg Zoller <[email protected]>
To: "[email protected]" <[email protected]>
Sent: Wednesday, April 13, 2016 9:08 PM
Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
I did and now it hangs on the first send. Here's my output: (hopefully won't
destroy formatting).The "::: Writing 1 :::" should output a line like that for
each of 6 writes, but only the first is called, then hangs until timeout.
[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO
org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none metric.reporters = [] metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers =
[192.168.99.100:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd =
/usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer =
class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full
= false ssl.key.password = null max.block.ms = 60000 interceptor.classes = null
sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000
ssl.truststore.password = null max.in.flight.requests.per.connection = 5
metrics.num.samples = 2 client.id = producer-1
ssl.endpoint.identification.algorithm = null ssl.protocol = TLS
request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols =
[TLSv1.2, TLSv1.1, TLSv1] acks = 1 batch.size = 16384 ssl.keystore.location =
null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type
= JKS security.protocol = PLAINTEXT retries = 0 max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072 linger.ms = 0
[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO
org.apache.kafka.common.utils.AppInfoParser - Kafka version :
0.10.1.0-SNAPSHOT[pool-10-thread-1-ScalaTest-running-KafkaSpec] INFO
org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 319c6e7195143c1f
::: Writing 1 ::::[kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.NetworkClient - Error while fetching metadata with
correlation id 0 :
{lowercaseStrings=LEADER_NOT_AVAILABLE}org.apache.kafka.common.errors.TimeoutException:
Failed to update metadata after 59860 ms.
From: Ismael Juma <[email protected]>
To: [email protected]; Greg Zoller <[email protected]>
Sent: Wednesday, April 13, 2016 5:54 PM
Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
Hi Greg,
It may be worth passing a callback to the `send` to see if there's any
error (or calling `get` on the Future returned).
Ismael
On Wed, Apr 13, 2016 at 2:59 PM, Greg Zoller <[email protected]>
wrote:
> So I rebuilt last night with the latest from master branch.
> Unfortunately same problem--producer doesn't seem to commit/close().
> After inserting a few records (which *seem* to go ok), close() times out
> and no offsets are updated--still set to 0.
> Any ideas?Thanks,Greg
>
> From: Greg Zoller <[email protected]>
> To: "[email protected]" <[email protected]>; Greg Zoller <
> [email protected]>
> Sent: Tuesday, April 12, 2016 11:49 AM
> Subject: Re: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
>
> Sorry the formatting was all messed up.I re-tested this code with 0.9.0.1
> and it worked fine--KafkaProducer closed and committed the number of
> records expected into the partitions.
> So this seems like a SNAPSHOT issue. Will continue looking.
>
> From: Greg Zoller <[email protected]>
> To: "[email protected]" <[email protected]>
> Sent: Tuesday, April 12, 2016 10:40 AM
> Subject: [0.10.1.0-SNAPSHOT] KafkaProducer.close() not committing
>
> Hello,
> I'm trying to run the latest master build in github. I've got producer
> code like below:
> val props = Map( "bootstrap.servers" -> host,
> "key.serializer" ->
> "org.apache.kafka.common.serialization.ByteArraySerializer",
> "value.serializer" ->
> "org.apache.kafka.common.serialization.StringSerializer" ) val p =
> new KafkaProducer[Array[Byte], String](props) (1 to num).foreach { i =>
> p.send(new ProducerRecord[Array[Byte], String](topic, s"msg-$i")) }
> p.close(10000, java.util.concurrent.TimeUnit.MILLISECONDS)
>
>
> This code will wait 10 seconds then close the KafkaProducer. At that
> point if I check the offsets in my server (same build) I see all offsets
> set to 0...no data committed. If I put some kind of println in my loop I
> see the p.send() call seeming to work happily.
> Any ideas?
> Thanks,Greg
>
>
>
>
>