Xuguang zhan created KAFKA-15064:
------------------------------------

             Summary: Use KafkaTemplate to send message with below exception - 
IllegalMonitorStateException
                 Key: KAFKA-15064
                 URL: https://issues.apache.org/jira/browse/KAFKA-15064
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.1.2
            Reporter: Xuguang zhan


*Running env:*
1.openjdk-17.0.2-5.el7.x86_64
2.Spring-kafka :2.8.11
3. Kafka client: 3.1.2



Special case would be: one Tomcat have three web applications or we say context 
, Kafka client put into tomcat share lib.

 

*Java Stack:*

java.lang.IllegalMonitorStateException: current thread is not owner
at java.lang.Object.wait(Native Method) ~[?:?]
at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55) 
~[kafka-clients-3.1.2.jar:?]
at 
org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)
 ~[kafka-clients-3.1.2.jar:?]
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1088)
 ~[kafka-clients-3.1.2.jar:?]
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:935) 
~[kafka-clients-3.1.2.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) 
~[kafka-clients-3.1.2.jar:?]
at 
org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087)
 ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) 
~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429) 
~[spring-kafka-2.8.11.jar:2.8.11]

 

 

Below is low level code stack :

 
{code:java}
//  @Override
    public void waitObject(Object obj, Supplier<Boolean> condition, long 
deadlineMs) throws InterruptedException {
        synchronized (obj) {
            while (true) {
                if (condition.get())
                    return;

                long currentTimeMs = milliseconds();
                if (currentTimeMs >= deadlineMs)
                    throw new TimeoutException("Condition not satisfied before 
deadline");

                obj.wait(deadlineMs - currentTimeMs);
            }
        }
    } {code}
 

 
{code:java}
// code placeholder
/**
     * Wait for metadata update until the current version is larger than the 
last version we know of
     */
    public synchronized void awaitUpdate(final int lastVersion, final long 
timeoutMs) throws InterruptedException {
        long currentTimeMs = time.milliseconds();
        long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : 
currentTimeMs + timeoutMs;
        time.waitObject(this, () -> {
            // Throw fatal exceptions, if there are any. Recoverable topic 
errors will be handled by the caller.
            maybeThrowFatalException();
            return updateVersion() > lastVersion || isClosed();
        }, deadlineMs);

        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }



{code}
 

I checked same issue check the jira which have been reported 
https://issues.apache.org/jira/browse/KAFKA-10902



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to