Hi all,

I am looking for a bit of guidance on an issue I am seeing in a Structured
Streaming (Spark Streaming) query being blocked due to a thread in
ExpiringCredentialRefreshingLogin not being able to shutdown.

The reason why the streaming query is blocked from making progress is
because the kafka admin client is blocked when calling close

"kafka-admin-client-thread | adminclient-8" Id=8761517 WAITING on
kafkashaded.org.apache.kafka.common.utils.KafkaThread@4b83f275 at
java.lang.Object.wait(Native Method) - waiting on
kafkashaded.org.apache.kafka.common.utils.KafkaThread@4b83f275 at
java.lang.Thread.join(Thread.java:1257) at
java.lang.Thread.join(Thread.java:1331) at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.close(ExpiringCredentialRefreshingLogin.java:251)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.close(OAuthBearerRefreshingLogin.java:134)
at
kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.release(LoginManager.java:153)
- locked java.lang.Class@76d43fbf at
kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.close(SaslChannelBuilder.java:246)
at
kafkashaded.org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1060)
at
kafkashaded.org.apache.kafka.common.network.Selector.close(Selector.java:375)
at
kafkashaded.org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:653)
at
kafkashaded.org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1039)
at
kafkashaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1361)
at java.lang.Thread.run(Thread.java:750)

which is blocked by

"kafka-expiring-relogin-thread-43d6772b-398a-45d5-ba6a-54c534411f09"
Id=8761516 BLOCKED on java.lang.Class@74f8d460owned by
"kafka-expiring-relogin-thread-43d6772b-398a-45d5-ba6a-54c534411f09" at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.reLogin(ExpiringCredentialRefreshingLogin.java:365)
- blocked on java.lang.Class@74f8d460 at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.access$600(ExpiringCredentialRefreshingLogin.java:40)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher.run(ExpiringCredentialRefreshingLogin.java:106)
at java.lang.Thread.run(Thread.java:750)

which is blocked by

"kafka-expiring-relogin-thread-43d6772b-398a-45d5-ba6a-54c534411f09"
Id=3203350 RUNNABLE (in native) at
java.net.SocketInputStream.socketRead0(Native Method) at
java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at
java.net.SocketInputStream.read(SocketInputStream.java:171) at
java.net.SocketInputStream.read(SocketInputStream.java:141) at
sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476) at
sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470)
at
sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160) at
sun.security.ssl.SSLTransport.decode(SSLTransport.java:110) at
sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1427) at
sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1333) at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:444) at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:415) at
sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:567) at
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:197)
at
sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.handleInput(HttpAccessTokenRetriever.java:228)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.post
<http://kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.httpaccesstokenretriever.post/>(HttpAccessTokenRetriever.java:193)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.lambda$retrieve$0(HttpAccessTokenRetriever.java:169)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever$$Lambda$8409/628622755.call(Unknown
Source) at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.Retry.execute(Retry.java:70)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.retrieve(HttpAccessTokenRetriever.java:160)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.handleTokenCallback(OAuthBearerLoginCallbackHandler.java:244)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.handle(OAuthBearerLoginCallbackHandler.java:233)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
at sun.reflect.GeneratedMethodAccessor879.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
javax.security.auth.login.LoginContext.invoke(LoginContext.java:755) at
javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) at
javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at
javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) at
javax.security.auth.login.LoginContext.login(LoginContext.java:587) at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.reLogin(ExpiringCredentialRefreshingLogin.java:390)
- locked java.lang.Class@74f8d460 at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.access$600(ExpiringCredentialRefreshingLogin.java:40)
at
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher.run(ExpiringCredentialRefreshingLogin.java:106)
at java.lang.Thread.run(Thread.java:750)

Some questions:
1. Is it normal / expected to have more than one
`kafka-expiring-relogin-thread`?
2. Looking at the code, there may also be a race condition. The close
method of ExpiringCredentialRefreshingLogin:


kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka
<https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L248>

The code attempts to interrupt the refresherThread
refresherThread.interrupt();:

kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka
<https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L250>

The the code joins the thread:

kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka
<https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L252>



However, refresherThread is assigned here:

kafka/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java
at df04887ba597941674a28d54bd35202bbd21631e · apache/kafka
<https://github.com/apache/kafka/blob/df04887ba597941674a28d54bd35202bbd21631e/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L241>

The refresherThread could be referencing the previous execution of the
refresh when interrupted was called and the join on a new
execution/instance of the refresh in which this latest instance has not
been interrupted.


The kafka client version used is 3.4.0

Any help on this issue is appreciated!

Best,

Jerry

Reply via email to