M.P. Korstanje created KAFKA-10902:
--------------------------------------

             Summary: IllegalMonitorStateException in 
KafkaProducer.waitOnMetadata
                 Key: KAFKA-10902
                 URL: https://issues.apache.org/jira/browse/KAFKA-10902
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 2.5.1
            Reporter: M.P. Korstanje


{{}}

{{}}

{{}}

We observe the following exception while using 
{{org.apache.kafka:kafka-clients:jar:2.5.1}}  as part of a Spring Boot 
application running in a docker container on {{openjdk:13-jdk-alpine3.1}}. 

 
{code:java}
        j.l.IllegalMonitorStateException: null
        at java.lang.Object.wait(Object.java)
        at o.a.k.common.utils.SystemTime.waitObject(SystemTime.java:55)
        at o.a.k.c.p.i.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)
        at 
o.a.k.c.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1029)
        at o.a.k.c.producer.KafkaProducer.doSend(KafkaProducer.java:883)
        at o.a.k.c.producer.KafkaProducer.send(KafkaProducer.java:862)
        at 
o.s.k.c.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:816)
        at b.k.clients.TracingProducer.send(TracingProducer.java:129)
        at o.s.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
        at o.s.kafka.core.KafkaTemplate.send(KafkaTemplate.java:401)
        < application specific> 
        at j.i.r.GeneratedMethodAccessor167.invoke(Unknown Source)
        at 
j.i.r.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:567)
        at 
o.s.w.m.s.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
        at 
o.s.w.m.s.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
        at 
o.s.w.s.m.m.a.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
        at 
o.s.w.s.m.m.a.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
        ... 87 frames truncated
{code}
 

The exception occurs when using a {{KafkaTemplate (}}ultimately invoking a 
{{KafkaProducer) }}to send a message. E..g:

 
{code:java}
@Service
public class Service {
    private final KafkaTemplate<String, UserChangedPinEvent> kafkaTemplate;

    // Constructor ommited

    public void publishEvent(final UUID userId) {
        final Event event = new Event(userId);

        final Message<Event> message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.TOPIC, "some-topic")
                .build();
        kafkaTemplate.send(message);
    }
}
{code}
 

I've not been able to reproduce this in isolation, we have observed this 
exception twice in the last six months. The system was not under any 
significant amount of load at the time.

{{}}

{{}}

Looking at the code this exception is unexpected because the 
{{SystemTime.waitObject}} correctly aquires a monitor before calling 
{{Object.wait}}.



 
{code:java}
@Override
public void waitObject(Object obj, Supplier<Boolean> condition, long 
deadlineMs) throws InterruptedException {
    synchronized (obj) {
        while (true) {
            if (condition.get())
                return;

            long currentTimeMs = milliseconds();
            if (currentTimeMs >= deadlineMs)
                throw new TimeoutException("Condition not satisfied before 
deadline");

            obj.wait(deadlineMs - currentTimeMs);
        }
    }
}
{code}

And in the caller, {{ProducerMetadata.awaitUpdate,}} a the monitor was also 
already aquired.

 
{code:java}
public synchronized void awaitUpdate(final int lastVersion, final long 
timeoutMs) throws InterruptedException {
    long currentTimeMs = time.milliseconds();
    long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : 
currentTimeMs + timeoutMs;
    time.waitObject(this, () -> {
        // Throw fatal exceptions, if there are any. Recoverable topic errors 
will be handled by the caller.
        maybeThrowFatalException();
        return updateVersion() > lastVersion || isClosed();
    }, deadlineMs);

    if (isClosed())
        throw new KafkaException("Requested metadata update after close");
}
{code}

So it is not clear to me how this exception can occur barring a JDK bug. 




 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to