[ https://issues.apache.org/jira/browse/KAFKA-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406737#comment-15406737 ]
Jun Rao commented on KAFKA-3875: -------------------------------- I took a look at this and found a few things. 1. The IllegalStateException is actually thrown from testCloseWithZeroTimeoutFromSenderThread() due to a bug. We call producer.close() in the callback. Once the first callback is called, producing records in the callback will hit the IllegalStateException. This only pollutes the output, but doesn't fail the test. I fixed this by only calling producer.send() in the first callback. 2. It's not clear which test throws TimeoutException and it's not reproducible locally. One thing is that the error message in TimeoutException is mis-leading since the timeout is not necessarily due to metadata. Improved this by making the error message in TimeoutException clearer. 3. It's not clear what actually failed testSendNonCompressedMessageWithCreateTime(). One thing I found is that since we set the linger time to MAX_LONG and are sending small messages, those produced messages won't be drained until we call producer.close(10000L, TimeUnit.MILLISECONDS). Normally, 10 secs should be enough for the records to be sent. My only hypothesis is that since SSL is more expensive, occasionally, 10 secs is still not enough. So, I bumped up the timeout from 10 secs to 20 secs. > Transient test failure: > kafka.api.SslProducerSendTest.testSendNonCompressedMessageWithCreateTime > ------------------------------------------------------------------------------------------------ > > Key: KAFKA-3875 > URL: https://issues.apache.org/jira/browse/KAFKA-3875 > Project: Kafka > Issue Type: Sub-task > Components: unit tests > Reporter: Ismael Juma > Assignee: Jun Rao > Labels: transient-unit-test-failure > > It failed in a couple of builds. > {code} > java.lang.AssertionError: Should have offset 100 but only successfully sent 0 > expected:<100> but was:<0> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at > kafka.api.BaseProducerSendTest.sendAndVerifyTimestamp(BaseProducerSendTest.scala:228) > at > kafka.api.BaseProducerSendTest.testSendNonCompressedMessageWithCreateTime(BaseProducerSendTest.scala:170) > {code} > And standard out: > {code} > org.scalatest.junit.JUnitTestFailedError: Send callback returns the following > exception > at > org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:103) > at > org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:79) > at org.scalatest.Assertions$class.fail(Assertions.scala:1348) > at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:79) > at > kafka.api.BaseProducerSendTest$callback$4$.onCompletion(BaseProducerSendTest.scala:209) > at > org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109) > at > org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:211) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing > 1 record(s) expired due to timeout while requesting metadata from brokers for > topic-0 > java.lang.IllegalStateException: Cannot send after the producer is closed. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:172) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:466) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) > at > kafka.api.BaseProducerSendTest$CloseCallback$1$$anonfun$onCompletion$1.apply(BaseProducerSendTest.scala:415) > at > kafka.api.BaseProducerSendTest$CloseCallback$1$$anonfun$onCompletion$1.apply(BaseProducerSendTest.scala:415) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.api.BaseProducerSendTest$CloseCallback$1.onCompletion(BaseProducerSendTest.scala:415) > at > org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:107) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:318) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:278) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:364) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) > at java.lang.Thread.run(Thread.java:745) > {code} > https://jenkins.confluent.io/job/kafka-trunk/905/ > https://jenkins.confluent.io/job/kafka-trunk/919 (the output is similar to > the first build) -- This message was sent by Atlassian JIRA (v6.3.4#6332)