[ 
https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13751:
-----------------------------
    Reviewer: Ismael Juma  (was: Luke Chen)

> On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13751
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13751
>             Project: Kafka
>          Issue Type: Bug
>          Components: security
>    Affects Versions: 3.0.1
>            Reporter: RivenSun
>            Priority: Critical
>
> h1. Phenomenon:
>  SASL/OAUTHBEARER, whether implemented by default or customized by the user, 
> is not compatible with other SASL mechanisms.
> h3.  
> case1:
> kafka_server_jaas_oauth.conf
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="admin"
>   user_admin="admin"
>   user_alice="alice"; 
>    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>    org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin"
>   password="admin_scram";
> }; {code}
>  server.properties
> {code:java}
> advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
>  
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
> Error when starting kafka:
> server.log
> {code:java}
> [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
>         at kafka.network.Processor.<init>(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
> JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
>         ... 17 more
> [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket 
> server request processors (kafka.network.SocketServer) {code}
> The default implementation class of oauthbearer's 
> `sasl.server.callback.handler.class` is 
> OAuthBearerUnsecuredValidatorCallbackHandler. 
> In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, 
> the jaasConfigEntries parameter is verified.
> What I want to say is that {*}the verification logic here is completely 
> reasonable{*}, but the jaasConfigEntries passed in from the upper layer 
> should not contain the AppConfigurationEntry of other loginModules. There are 
> several other codes for the check of the same keyword *"Must supply exactly 1 
> non-null JAAS mechanism configuration".*
> Rootcause elaborates later.
> By the way, at present, KafkaServer allows {*}the same LoginModule to be 
> configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
> the phenomenon of case1.
> kafka_server_jaas_oauth.conf eg:
> {code:java}
> KafkaServer {
>    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>  
> }; {code}
>  
> h3. case2:
> On the basis of case1, modify the default implementation of oauthbearer's 
> `sasl.server.callback.handler.class`
>  server.properties add new configuration
> {code:java}
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
> listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
> listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
>  {code}
> The specific implementation class code is omitted here.
> When we start kafka again, we still encounter exceptions
> server.log
> {code:java}
> [2022-03-16 14:47:46,037] ERROR Unrecognized SASL Login callback 
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
> javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL 
> Login callback
>         at 
> org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
>         at 
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
>         at 
> java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
>         at 
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
>         at 
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
>         at 
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
>         at kafka.network.Processor.<init>(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> [2022-03-16 14:47:46,048] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: 
> javax.security.auth.login.LoginException: An internal error occurred while 
> retrieving token from callback handler
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
>         at kafka.network.Processor.<init>(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: javax.security.auth.login.LoginException: An internal error 
> occurred while retrieving token from callback handler
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
>         at 
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
>         at 
> java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
>         at 
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
>         at 
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
>         at 
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
>         ... 17 more
> [2022-03-16 14:47:46,049] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> [2022-03-16 14:47:46,052] INFO [SocketServer brokerId=1] Stopping socket 
> server request processors (kafka.network.SocketServer) {code}
> By analyzing the stack, it is easy to see the problem:
> 1) AbstractLogin.login should not call the OAuthBearerLoginModule.login 
> method, only OAuthBearerRefreshingLogin.login can call the 
> OAuthBearerLoginModule.login method.
> 2) OAuthBearerLoginModule.login should not call the 
> AbstractLogin$DefaultLoginCallbackHandler.handle method, it should call the 
> OAuthBearerUnsecuredLoginCallbackHandler.handle method.
>  
> h1. Summary
> Analyze the stack, read the source code, the root cause is:
> In the ChannelBuilders#create method, if the corresponding JaasContext of 
> each SASL mechanism is returned by the *JaasContext#defaultContext()* method, 
> the *JaasContext.configuration* and *JaasContext.configurationEntries* fields 
> both contain the AppConfigurationEntry of other mechanisms. And then in the 
> subsequent code stack, resulting in case1 or case2.
> In fact, you can find from some source codes of Kafka saslModule that Kafka 
> has this {*}original intention: JaasContext per saslMechanism should be 
> pure{*}.
> 1) In the ChannelBuilders#create(...) method, when constructing the 
> SaslChannelBuilder, `Map<String, JaasContext> jaasContexts` is passed in
> 2) In the SaslChannelBuilder#configure(...) method, after constructing 
> LoginManager, LoginManager is also stored with `Map<String, LoginManager> 
> loginManagers`.
> The keys of these two maps are {*}saslMechanism{*}, but their values ​​both 
> contain all saslMechanism information. Affected by this behavior, 
> jaasConfigEntries in OAuthBearerUnsecuredValidatorCallbackHandler also 
> contains other saslMechanism information, other implementation classes of 
> sasl.server.callback.handler.class have the same problem.
> In fact, I have reported an issue a long time ago, and KAFKA-13422 has the 
> same RC and Solution as this issue. 
> https://issues.apache.org/jira/browse/KAFKA-13422
> For more detailed RC analysis and solutions you can read the last two parts 
> of the description of KAFKA-13422: Root Cause and Suggestion & Solutions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to