Hi Kirk, The current setup of Auth0 for authentication and ACL for authorization works for me however I was exploring how best we can leverage oAuth both for authentication and Authorization both without writing custom classes.
Regards Ashish On Wed, Apr 2, 2025 at 6:16 AM Kirk True <k...@kirktrue.pro> wrote: > Hi Ashish, > > I'm glad you were able to get it working! :) > > Reply to your questions below... > > On Mon, Mar 31, 2025, at 10:19 PM, ashish sood wrote: > > Hi Kirk, > > > > I managed to get it running finally. > > > > > > *Server.properties* > > > > listeners=SASL_PLAINTEXT://:9093 > > advertised.listeners=SASL_PLAINTEXT://xxxxxx:9093 > > sasl.enabled.mechanisms=OAUTHBEARER > > sasl.oauthbearer.expected.audience=https://myprovider.com > > > oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > > required; > > inter.broker.listener.name=SASL_PLAINTEXT > > sasl.mechanism.inter.broker.protocol=OAUTHBEARER > > > listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler > > > listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler > > sasl.oauthbearer.token.endpoint.url= > > https://xxxxxxxxxxx.auth0.com/oauth/token > > sasl.oauthbearer.jwks.endpoint.url= > > https://xxxxxxxxxxxxx.auth0.com/.well-known/jwks.json > > > > *jaas.config* > > KafkaServer { > > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > > required > > clientId="xxxxxxxxxxxxxxxxxxxxxx" > > clientSecret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" > > token.endpoint.uri="xxxxxxxxxxxxxxxxxxxauth0.com/oauth/token"; > > }; > > > > Although I have a conceptual doubt if you could please help. How can we > > leverage the scope coming in the token ? For example - If the token has > > scope set to *kafka.read*, the client should have read access to a > specific > > topic. Do we need to write a custom class for this OR the existing > classes > > (OAuthBearerValidatorCallbackHandler/OAuthBearerLoginCallbackHandler) can > > help achieve this ? > > This would unfortunately require a custom validator callback handler, yes. > > > Currently I am using the Client ID in the access token in the ACL to > allow > > read/write permissions on topics. > > Are you wanting to use Auth0 for both authentication and authorization > instead of authentication via Auth0 and authorization via ACLs? > > Thanks, > Kirk > > > > > Thanks & Regards > > Ashish > > > > On Tue, Apr 1, 2025 at 6:07 AM Kirk True <k...@kirktrue.pro> wrote: > > > > > Hi Ashish, > > > > > > In your stack trace I see it's invoking > > > > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler, > > > so something in your configuration seems amiss. > > > > > > If you can capture the AdminClientConfig output (with sensitive stuff > > > redacted, obvs), that would be helpful. > > > > > > Thanks, > > > Kirk > > > > > > On Thu, Mar 20, 2025, at 3:55 AM, ashish sood wrote: > > > > Hi Kirk, > > > > > > > > Thanks for checking. > > > > > > > > I am trying to setup a Kafka cluster with end-to-end oauth (i.e. > Kafka - > > > > Kafka communication within a cluster & clients to Kafka broker). I > was > > > able > > > > to get my broker started without errors with below config however I > am > > > now > > > > unable to create topics with below error. > > > > > > > > *Current config* > > > > *jaas.config* > > > > > > > > KafkaServer { > > > > > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > > > > required > > > > clientId="<xxxxxxxx>" > > > > clientSecret="<xxxxxxxx>" > > > > audience="https://myprovider.com" > > > > token.endpoint.uri="https://xxxxxxxx/oauth/token" > > > > scope="kafka.read kafka.write"; > > > > }; > > > > > > > > *server.properties* > > > > listeners=SASL_PLAINTEXT://:9093 > > > > advertised.listeners=SASL_PLAINTEXT://<>:9093 > > > > sasl.enabled.mechanisms=OAUTHBEARER > > > > sasl.oauthbearer.expected.audience=https://myprovider.com > > > > > > > > oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > > > > required; > > > > inter.broker.listener.name=SASL_PLAINTEXT > > > > sasl.mechanism.inter.broker.protocol=OAUTHBEARER > > > > > > > > listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler > > > > > > > > listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler > > > > sasl.oauthbearer.token.endpoint.url=https://<xxxxxxxx>/oauth/token > > > > sasl.oauthbearer.jwks.endpoint.url=https:// > > > <xxxxxxxx>/.well-known/jwks.json > > > > > > > > *ERROR WHILE CREATING TOPIC* > > > > > > > > This is very strange because when I check the fetch the token > manually > > > via > > > > curl and check it , I clearly see the "sub" field populated with > value > > > > <clientid@clients> > > > > > > > > > > > > * ERROR No principal name in JWT claim: sub > > > > > > > > (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)java.io.IOException: > > > > No principal name in JWT claim: sub* > > > > at > > > > > > > > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165) > > > > at > > > > > > > > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) > > > > at > > > > > > > > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) > > > > at > > > > > > > > java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754) > > > > at > > > > > > > > java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678) > > > > at > > > > > > > > java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676) > > > > at > > > > > > > > java.base/java.security.AccessController.doPrivileged(AccessController.java:714) > > > > at > > > > > > > > java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676) > > > > at > > > > > > > > java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587) > > > > at > > > > > > > > org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) > > > > at > > > > > > > > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) > > > > at > > > > > > > > org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) > > > > at > > > > > > > > org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) > > > > at > > > > > > > > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) > > > > at > > > > > > > > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) > > > > at > > > > > > > > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) > > > > at > > > > > > > > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:119) > > > > at > > > > > > > > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:223) > > > > at > > > > > > > > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:189) > > > > at > > > > > > > > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:525) > > > > at > > > > > > > > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:492) > > > > at > org.apache.kafka.clients.admin.Admin.create(Admin.java:137) > > > > at > > > > > > > > org.apache.kafka.tools.TopicCommand$TopicService.createAdminClient(TopicCommand.java:437) > > > > at > > > > > > > > org.apache.kafka.tools.TopicCommand$TopicService.<init>(TopicCommand.java:426) > > > > at > > > org.apache.kafka.tools.TopicCommand.execute(TopicCommand.java:98) > > > > at > > > > org.apache.kafka.tools.TopicCommand.mainNoExit(TopicCommand.java:87) > > > > at > org.apache.kafka.tools.TopicCommand.main(TopicCommand.java:82) > > > > > > > > > > > > > > > > > > > > > > > > Regards > > > > Ashish Sood > > > > > > > > On Thu, Mar 20, 2025 at 12:15 AM Kirk True <k...@kirktrue.pro> > wrote: > > > > > > > > > Hi Ashish, > > > > > > > > > > Are you using OAuth for client->broker communication, inter-broker > > > > > communication, or both? > > > > > > > > > > Based on the server.properties configuration that was shared, it > looks > > > > > like the configuration is attempting to set up inter-broker > > > communication > > > > > using OAuth. > > > > > > > > > > For a broker to *retrieve* tokens , it needs to have this > > > configuration: > > > > > > > > > > > > > > > > > > > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler > > > > > > > > > > For a broker to *validate* tokens, it needs to have this > configuration: > > > > > > > > > > > > > > > > > > > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler > > > > > > > > > > Then the SASL configs would need to be included too: > > > > > > > > > > > > > > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > > > > > required \ > > > > > clientId="XXXXXXXXXXXXXXXXXX" > > > > > clientSecret="XXXXXXXXXXXXXXXXXX" > > > > > audience="https://myprovider.com" > > > > > serviceName="kafka" > > > > > scope="kafka.read kafka.write"; > > > > > > > > > > If possible, please share any non-sensitive logs. > > > > > > > > > > Thanks, > > > > > Kirk > > > > > > > > > > On Wed, Mar 19, 2025, at 3:41 AM, ashish sood wrote: > > > > > > Hi All, > > > > > > > > > > > > I am setting up oauth for my Kafka broker. I have set up an > account > > > on > > > > > Auth0 for the same and set up an application and API. > > > > > > > > > > > > With the below config in the server.properties and Jaas.config > file I > > > > > keep getting invalid token. Although if I generate a manual token > via > > > curl > > > > > it works fine. Also Auth0 logs show successful generation of the > token, > > > > > still the Kafka shows error. Any suggestions to resolve this issue > > > would be > > > > > appreciated. > > > > > > > > > > > > *Server.properties* > > > > > > listeners=SASL_PLAINTEXT://:9093 > > > > > > advertised.listeners=SASL_PLAINTEXT://<XXXXXX>:9093 > > > > > > sasl.enabled.mechanisms=OAUTHBEARER > > > > > > sasl.oauthbearer.jwks.endpoint.url=https://XXXXXXXXX/oauth/token > < > > > > > https://xxxxxxxxx/oauth/token> > > > > > > sasl.oauthbearer.expected.audience=https://myprovider.com > > > > > > > > > > > > > > > oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > > > > > required; > > > > > > > > > > > > > > > listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler > > > > > > confluent.oauth.groups.claim.name=groups > > > > > > inter.broker.listener.name=SASL_PLAINTEXT > > > > > > sasl.mechanism.inter.broker.protocol=OAUTHBEARER > > > > > > super.users=User:<ClientID> > > > > > > sasl.oauthbearer.token.endpoint.url=<XXXXXXXXX>/oauth/token > > > > > > sasl.oauthbearer.audience=https://myprovider.com > > > > > > allow.everyone.if.no.acl.found=true > > > > > > ** > > > > > > *Jaas Config* > > > > > > KafkaServer { > > > > > > > > > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > > > > > required > > > > > > clientId="XXXXXXXXXXXXXXXXXX" > > > > > > clientSecret="XXXXXXXXXXXXXXXXXX" > > > > > > audience="https://myprovider.com" > > > > > > serviceName="kafka" > > > > > > scope="kafka.read kafka.write"; > > > > > > }; > > > > > > > > > > > > *Error* > > > > > > [2025-03-19 16:05:43,465] INFO [Controller id=0, > targetBrokerId=0] > > > > > Failed authentication with localhost/127.0.0.1 (channelId=0) > > > > > ({"status":"invalid_token"}) > (org.apache.kafka.common.network.Selector) > > > > > > > > > > > > image.png > > > > > > > > > > > > image.png > > > > > > > > > > > > Thanks & Regards > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ReplyForward > > > > > > > > > > > > > > > > > > Add reaction > > > > > > > > > > > > > > >