[ 
https://issues.apache.org/jira/browse/KAFKA-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406737#comment-15406737
 ] 

Jun Rao commented on KAFKA-3875:
--------------------------------

I took a look at this and found a few things.

1. The IllegalStateException is actually thrown from 
testCloseWithZeroTimeoutFromSenderThread() due to a bug. We call 
producer.close() in the callback. Once the first callback is called, producing 
records in the callback will hit the IllegalStateException. This only pollutes 
the output,  but doesn't fail the test. I fixed this by only calling 
producer.send() in the first callback.

2. It's not clear which test throws TimeoutException and it's not reproducible 
locally. One thing is that the error message in TimeoutException is mis-leading 
since the timeout is not necessarily due to metadata. Improved this by making 
the error message in TimeoutException clearer.

3. It's not clear what actually failed 
testSendNonCompressedMessageWithCreateTime(). One thing I found is that since 
we set the linger time to MAX_LONG and are sending small messages, those 
produced messages won't be drained until we call producer.close(10000L, 
TimeUnit.MILLISECONDS). Normally, 10 secs should be enough for the records to 
be sent. My only hypothesis is that since SSL is more expensive, occasionally, 
10 secs is still not enough. So, I bumped up the timeout from 10 secs to 20 
secs.

> Transient test failure: 
> kafka.api.SslProducerSendTest.testSendNonCompressedMessageWithCreateTime
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3875
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3875
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: unit tests
>            Reporter: Ismael Juma
>            Assignee: Jun Rao
>              Labels: transient-unit-test-failure
>
> It failed in a couple of builds.
> {code}
> java.lang.AssertionError: Should have offset 100 but only successfully sent 0 
> expected:<100> but was:<0>
>       at org.junit.Assert.fail(Assert.java:88)
>       at org.junit.Assert.failNotEquals(Assert.java:834)
>       at org.junit.Assert.assertEquals(Assert.java:645)
>       at 
> kafka.api.BaseProducerSendTest.sendAndVerifyTimestamp(BaseProducerSendTest.scala:228)
>       at 
> kafka.api.BaseProducerSendTest.testSendNonCompressedMessageWithCreateTime(BaseProducerSendTest.scala:170)
> {code}
> And standard out:
> {code}
> org.scalatest.junit.JUnitTestFailedError: Send callback returns the following 
> exception
>       at 
> org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:103)
>       at 
> org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:79)
>       at org.scalatest.Assertions$class.fail(Assertions.scala:1348)
>       at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:79)
>       at 
> kafka.api.BaseProducerSendTest$callback$4$.onCompletion(BaseProducerSendTest.scala:209)
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:211)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 
> 1 record(s) expired due to timeout while requesting metadata from brokers for 
> topic-0
> java.lang.IllegalStateException: Cannot send after the producer is closed.
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:172)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:466)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
>       at 
> kafka.api.BaseProducerSendTest$CloseCallback$1$$anonfun$onCompletion$1.apply(BaseProducerSendTest.scala:415)
>       at 
> kafka.api.BaseProducerSendTest$CloseCallback$1$$anonfun$onCompletion$1.apply(BaseProducerSendTest.scala:415)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.Range.foreach(Range.scala:160)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> kafka.api.BaseProducerSendTest$CloseCallback$1.onCompletion(BaseProducerSendTest.scala:415)
>       at 
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:107)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:318)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:278)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
>       at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:364)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> https://jenkins.confluent.io/job/kafka-trunk/905/
> https://jenkins.confluent.io/job/kafka-trunk/919 (the output is similar to 
> the first build)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to