Hi all, I am looking for a bit of guidance on an issue I am seeing in a Structured Streaming (Spark Streaming) query being blocked due to a thread in ExpiringCredentialRefreshingLogin not being able to shutdown.
The reason why the streaming query is blocked from making progress is because the kafka admin client is blocked when calling close "kafka-admin-client-thread | adminclient-8" Id=8761517 WAITING on kafkashaded.org.apache.kafka.common.utils.KafkaThread@4b83f275 at java.lang.Object.wait(Native Method) - waiting on kafkashaded.org.apache.kafka.common.utils.KafkaThread@4b83f275 at java.lang.Thread.join(Thread.java:1257) at java.lang.Thread.join(Thread.java:1331) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.close(ExpiringCredentialRefreshingLogin.java:251) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.close(OAuthBearerRefreshingLogin.java:134) at kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.release(LoginManager.java:153) - locked java.lang.Class@76d43fbf at kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.close(SaslChannelBuilder.java:246) at kafkashaded.org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1060) at kafkashaded.org.apache.kafka.common.network.Selector.close(Selector.java:375) at kafkashaded.org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:653) at kafkashaded.org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1039) at kafkashaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1361) at java.lang.Thread.run(Thread.java:750) which is blocked by "kafka-expiring-relogin-thread-43d6772b-398a-45d5-ba6a-54c534411f09" Id=8761516 BLOCKED on java.lang.Class@74f8d460owned by "kafka-expiring-relogin-thread-43d6772b-398a-45d5-ba6a-54c534411f09" at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.reLogin(ExpiringCredentialRefreshingLogin.java:365) - blocked on java.lang.Class@74f8d460 at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.access$600(ExpiringCredentialRefreshingLogin.java:40) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher.run(ExpiringCredentialRefreshingLogin.java:106) at java.lang.Thread.run(Thread.java:750) which is blocked by "kafka-expiring-relogin-thread-43d6772b-398a-45d5-ba6a-54c534411f09" Id=3203350 RUNNABLE (in native) at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476) at sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470) at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160) at sun.security.ssl.SSLTransport.decode(SSLTransport.java:110) at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1427) at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1333) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:444) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:415) at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:567) at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:197) at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.handleInput(HttpAccessTokenRetriever.java:228) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.post <http://kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.httpaccesstokenretriever.post/>(HttpAccessTokenRetriever.java:193) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.lambda$retrieve$0(HttpAccessTokenRetriever.java:169) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever$$Lambda$8409/628622755.call(Unknown Source) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.Retry.execute(Retry.java:70) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.retrieve(HttpAccessTokenRetriever.java:160) at kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.handleTokenCallback(OAuthBearerLoginCallbackHandler.java:244) at kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.handle(OAuthBearerLoginCallbackHandler.java:233) at kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) at kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) at sun.reflect.GeneratedMethodAccessor879.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.reLogin(ExpiringCredentialRefreshingLogin.java:390) - locked java.lang.Class@74f8d460 at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.access$600(ExpiringCredentialRefreshingLogin.java:40) at kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher.run(ExpiringCredentialRefreshingLogin.java:106) at java.lang.Thread.run(Thread.java:750) Some questions: 1. Is it normal / expected to have more than one `kafka-expiring-relogin-thread`? 2. Looking at the code, there may also be a race condition. The close method of ExpiringCredentialRefreshingLogin: kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka <https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L248> The code attempts to interrupt the refresherThread refresherThread.interrupt();: kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka <https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L250> The the code joins the thread: kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka <https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L252> However, refresherThread is assigned here: kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka <https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L241> The refresherThread could be referencing the previous execution of the refresh when interrupted was called and the join on a new execution/instance of the refresh in which this latest instance has not been interrupted. The kafka client version used is 3.4.0 Any help on this issue is appreciated! Best, Jerry