[ https://issues.apache.org/jira/browse/KAFKA-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879019#comment-15879019 ]
Steven Schlansker commented on KAFKA-4787: ------------------------------------------ A simple solution could simply have further close() calls not call the inner synchronized close overload but instead join on the already existing close request > KafkaStreams close() is not reentrant > ------------------------------------- > > Key: KAFKA-4787 > URL: https://issues.apache.org/jira/browse/KAFKA-4787 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Steven Schlansker > > While building a simple application, I tried to implement a failure policy > where any uncaught exception terminates the application until an > administrator can evaluate and intervene: > {code} > /** Handle any uncaught exception by shutting down the program. */ > private void handleStreamException(Thread thread, Throwable t) { > LOG.error("stream exception in thread {}", thread, t); > streams.close(); > } > streams.setUncaughtExceptionHandler(this::handleStreamException); > streams.start(); > {code} > Unfortunately, because the KafkaStreams#close() method takes a lock, this is > prone to what looks like a deadlock: > {code} > "StreamThread-1" #80 prio=5 os_prio=0 tid=0x00007f56096f4000 nid=0x40c8 > waiting for monitor entry [0x00007f54f03ee000] > java.lang.Thread.State: BLOCKED (on object monitor) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java) > - waiting to lock <0x00000000f171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > at > com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541) > at > com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown > Source) > at java.lang.Thread.dispatchUncaughtException(Thread.java:1956) > "main" #1 prio=5 os_prio=0 tid=0x00007f5608011000 nid=0x3f76 in Object.wait() > [0x00007f5610f04000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1249) > - locked <0x00000000fd302bf0> (a java.lang.Thread) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494) > - locked <0x00000000f171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > {code} > Note how the main thread calls close(), which encounters an exception. It > uses a StreamThread to dispatch to the handler, which calls close(). Once it > tries to take the monitor, we are left in a position where main is joined on > StreamThread-1, but StreamThread-1 is waiting for main to release that > monitor. > Arguably it's a bit abusive to call close() in this way (it certainly wasn't > intentional) -- but to make Kafka Streams robust it should handle any > sequence of close() invocations in particular gracefully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)