[ https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
RivenSun updated KAFKA-13751: ----------------------------- Reviewer: Ismael Juma (was: Luke Chen) > On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms > ---------------------------------------------------------------------------- > > Key: KAFKA-13751 > URL: https://issues.apache.org/jira/browse/KAFKA-13751 > Project: Kafka > Issue Type: Bug > Components: security > Affects Versions: 3.0.1 > Reporter: RivenSun > Priority: Critical > > h1. Phenomenon: > SASL/OAUTHBEARER, whether implemented by default or customized by the user, > is not compatible with other SASL mechanisms. > h3. > case1: > kafka_server_jaas_oauth.conf > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="admin" > user_admin="admin" > user_alice="alice"; > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin" > password="admin_scram"; > }; {code} > server.properties > {code:java} > advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669 > > listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code} > Error when starting kafka: > server.log > {code:java} > [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Must supply exactly 1 non-null JAAS mechanism configuration (size was 3) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172) > at kafka.network.Processor.<init>(SocketServer.scala:724) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:367) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211) > at kafka.network.SocketServer.startup(SocketServer.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:266) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null > JAAS mechanism configuration (size was 3) > at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139) > ... 17 more > [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down > (kafka.server.KafkaServer) > [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket > server request processors (kafka.network.SocketServer) {code} > The default implementation class of oauthbearer's > `sasl.server.callback.handler.class` is > OAuthBearerUnsecuredValidatorCallbackHandler. > In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, > the jaasConfigEntries parameter is verified. > What I want to say is that {*}the verification logic here is completely > reasonable{*}, but the jaasConfigEntries passed in from the upper layer > should not contain the AppConfigurationEntry of other loginModules. There are > several other codes for the check of the same keyword *"Must supply exactly 1 > non-null JAAS mechanism configuration".* > Rootcause elaborates later. > By the way, at present, KafkaServer allows {*}the same LoginModule to be > configured multiple times in kafkaJaasConfigFile{*}, which will also lead to > the phenomenon of case1. > kafka_server_jaas_oauth.conf eg: > {code:java} > KafkaServer { > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required; > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required; > > }; {code} > > h3. case2: > On the basis of case1, modify the default implementation of oauthbearer's > `sasl.server.callback.handler.class` > server.properties add new configuration > {code:java} > listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler > listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler > listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler > {code} > The specific implementation class code is omitted here. > When we start kafka again, we still encounter exceptions > server.log > {code:java} > [2022-03-16 14:47:46,037] ERROR Unrecognized SASL Login callback > (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule) > javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL > Login callback > at > org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105) > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) > at > java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726) > at > java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665) > at > java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663) > at > java.base/java.security.AccessController.doPrivileged(AccessController.java:691) > at > java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663) > at > java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574) > at > org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60) > at > org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) > at > org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) > at > org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) > at kafka.network.Processor.<init>(SocketServer.scala:724) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:367) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211) > at kafka.network.SocketServer.startup(SocketServer.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:266) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > [2022-03-16 14:47:46,048] ERROR [KafkaServer id=1] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.KafkaException: > javax.security.auth.login.LoginException: An internal error occurred while > retrieving token from callback handler > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) > at > org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97) > at kafka.network.Processor.<init>(SocketServer.scala:724) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:367) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211) > at kafka.network.SocketServer.startup(SocketServer.scala:122) > at kafka.server.KafkaServer.startup(KafkaServer.scala:266) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > Caused by: javax.security.auth.login.LoginException: An internal error > occurred while retrieving token from callback handler > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319) > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) > at > java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726) > at > java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665) > at > java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663) > at > java.base/java.security.AccessController.doPrivileged(AccessController.java:691) > at > java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663) > at > java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574) > at > org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60) > at > org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) > at > org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) > ... 17 more > [2022-03-16 14:47:46,049] INFO [KafkaServer id=1] shutting down > (kafka.server.KafkaServer) > [2022-03-16 14:47:46,052] INFO [SocketServer brokerId=1] Stopping socket > server request processors (kafka.network.SocketServer) {code} > By analyzing the stack, it is easy to see the problem: > 1) AbstractLogin.login should not call the OAuthBearerLoginModule.login > method, only OAuthBearerRefreshingLogin.login can call the > OAuthBearerLoginModule.login method. > 2) OAuthBearerLoginModule.login should not call the > AbstractLogin$DefaultLoginCallbackHandler.handle method, it should call the > OAuthBearerUnsecuredLoginCallbackHandler.handle method. > > h1. Summary > Analyze the stack, read the source code, the root cause is: > In the ChannelBuilders#create method, if the corresponding JaasContext of > each SASL mechanism is returned by the *JaasContext#defaultContext()* method, > the *JaasContext.configuration* and *JaasContext.configurationEntries* fields > both contain the AppConfigurationEntry of other mechanisms. And then in the > subsequent code stack, resulting in case1 or case2. > In fact, you can find from some source codes of Kafka saslModule that Kafka > has this {*}original intention: JaasContext per saslMechanism should be > pure{*}. > 1) In the ChannelBuilders#create(...) method, when constructing the > SaslChannelBuilder, `Map<String, JaasContext> jaasContexts` is passed in > 2) In the SaslChannelBuilder#configure(...) method, after constructing > LoginManager, LoginManager is also stored with `Map<String, LoginManager> > loginManagers`. > The keys of these two maps are {*}saslMechanism{*}, but their values both > contain all saslMechanism information. Affected by this behavior, > jaasConfigEntries in OAuthBearerUnsecuredValidatorCallbackHandler also > contains other saslMechanism information, other implementation classes of > sasl.server.callback.handler.class have the same problem. > In fact, I have reported an issue a long time ago, and KAFKA-13422 has the > same RC and Solution as this issue. > https://issues.apache.org/jira/browse/KAFKA-13422 > For more detailed RC analysis and solutions you can read the last two parts > of the description of KAFKA-13422: Root Cause and Suggestion & Solutions. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)