Hi Ashish,

In your stack trace I see it's invoking 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler,
 so something in your configuration seems amiss.

If you can capture the AdminClientConfig output (with sensitive stuff redacted, 
obvs), that would be helpful.

Thanks,
Kirk 

On Thu, Mar 20, 2025, at 3:55 AM, ashish sood wrote:
> Hi Kirk,
> 
> Thanks for checking.
> 
> I am trying to setup a Kafka cluster with end-to-end oauth (i.e. Kafka -
> Kafka communication within a cluster & clients to Kafka broker). I was able
> to get my broker started without errors with below config however I am now
> unable to create topics with below error.
> 
> *Current config*
> *jaas.config*
> 
> KafkaServer {
>     org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required
>     clientId="<xxxxxxxx>"
>     clientSecret="<xxxxxxxx>"
>     audience="https://myprovider.com";
>     token.endpoint.uri="https://xxxxxxxx/oauth/token";
>     scope="kafka.read kafka.write";
> };
> 
> *server.properties*
> listeners=SASL_PLAINTEXT://:9093
> advertised.listeners=SASL_PLAINTEXT://<>:9093
> sasl.enabled.mechanisms=OAUTHBEARER
> sasl.oauthbearer.expected.audience=https://myprovider.com
> oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required;
> inter.broker.listener.name=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
> sasl.oauthbearer.token.endpoint.url=https://<xxxxxxxx>/oauth/token
> sasl.oauthbearer.jwks.endpoint.url=https://<xxxxxxxx>/.well-known/jwks.json
> 
> *ERROR WHILE CREATING TOPIC*
> 
> This is very strange because when I check the fetch the token manually via
> curl and check it , I clearly see the "sub" field populated with value
> <clientid@clients>
> 
> 
> * ERROR No principal name in JWT claim: sub
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)java.io.IOException:
> No principal name in JWT claim: sub*
>         at
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
>         at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
>         at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
>         at
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
>         at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
>         at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
>         at
> java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
>         at
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
>         at
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
>         at
> org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204)
>         at
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150)
>         at
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
>         at
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
>         at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
>         at
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
>         at
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
>         at
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:119)
>         at
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:223)
>         at
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:189)
>         at
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:525)
>         at
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:492)
>         at org.apache.kafka.clients.admin.Admin.create(Admin.java:137)
>         at
> org.apache.kafka.tools.TopicCommand$TopicService.createAdminClient(TopicCommand.java:437)
>         at
> org.apache.kafka.tools.TopicCommand$TopicService.<init>(TopicCommand.java:426)
>         at org.apache.kafka.tools.TopicCommand.execute(TopicCommand.java:98)
>         at
> org.apache.kafka.tools.TopicCommand.mainNoExit(TopicCommand.java:87)
>         at org.apache.kafka.tools.TopicCommand.main(TopicCommand.java:82)
> 
> 
> 
> 
> 
> Regards
> Ashish Sood
> 
> On Thu, Mar 20, 2025 at 12:15 AM Kirk True <k...@kirktrue.pro> wrote:
> 
> > Hi Ashish,
> >
> > Are you using OAuth for client->broker communication, inter-broker
> > communication, or both?
> >
> > Based on the server.properties configuration that was shared, it looks
> > like the configuration is attempting to set up inter-broker communication
> > using OAuth.
> >
> > For a broker to *retrieve* tokens , it needs to have this configuration:
> >
> >
> > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
> >
> > For a broker to *validate* tokens, it needs to have this configuration:
> >
> >
> > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
> >
> > Then the SASL configs would need to be included too:
> >
> > listener.name.SASL_PLAINTEXT.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required \
> >     clientId="XXXXXXXXXXXXXXXXXX"
> >     clientSecret="XXXXXXXXXXXXXXXXXX"
> >     audience="https://myprovider.com";
> >     serviceName="kafka"
> >     scope="kafka.read kafka.write";
> >
> > If possible, please share any non-sensitive logs.
> >
> > Thanks,
> > Kirk
> >
> > On Wed, Mar 19, 2025, at 3:41 AM, ashish sood wrote:
> > > Hi All,
> > >
> > > I am setting up oauth for my Kafka broker. I have set up an account on
> > Auth0 for the same and set up an application and API.
> > >
> > > With the below config in the server.properties and Jaas.config file I
> > keep getting invalid token. Although if I generate a manual token via curl
> > it works fine. Also Auth0 logs show successful generation of the token,
> > still the Kafka shows error. Any suggestions to resolve this issue would be
> > appreciated.
> > >
> > > *Server.properties*
> > > listeners=SASL_PLAINTEXT://:9093
> > > advertised.listeners=SASL_PLAINTEXT://<XXXXXX>:9093
> > > sasl.enabled.mechanisms=OAUTHBEARER
> > > sasl.oauthbearer.jwks.endpoint.url=https://XXXXXXXXX/oauth/token <
> > https://xxxxxxxxx/oauth/token>
> > > sasl.oauthbearer.expected.audience=https://myprovider.com
> > >
> > oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required;
> > >
> > listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> > > confluent.oauth.groups.claim.name=groups
> > > inter.broker.listener.name=SASL_PLAINTEXT
> > > sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> > > super.users=User:<ClientID>
> > > sasl.oauthbearer.token.endpoint.url=<XXXXXXXXX>/oauth/token
> > > sasl.oauthbearer.audience=https://myprovider.com
> > > allow.everyone.if.no.acl.found=true
> > > **
> > > *Jaas Config*
> > > KafkaServer {
> > >     org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required
> > >     clientId="XXXXXXXXXXXXXXXXXX"
> > >     clientSecret="XXXXXXXXXXXXXXXXXX"
> > >     audience="https://myprovider.com";
> > >     serviceName="kafka"
> > >     scope="kafka.read kafka.write";
> > > };
> > >
> > > *Error*
> > > [2025-03-19 16:05:43,465] INFO [Controller id=0, targetBrokerId=0]
> > Failed authentication with localhost/127.0.0.1 (channelId=0)
> > ({"status":"invalid_token"}) (org.apache.kafka.common.network.Selector)
> > >
> > > image.png
> > >
> > > image.png
> > >
> > > Thanks & Regards
> > >
> > >
> > >
> > >
> > >
> > >
> > > ReplyForward
> > >
> > >
> > > Add reaction
> >
> 

Reply via email to