Hi Kirk,

The current setup of Auth0 for authentication and ACL for authorization
works for me however I was exploring how best we can leverage oAuth both
for authentication and Authorization both without writing custom classes.

Regards
Ashish

On Wed, Apr 2, 2025 at 6:16 AM Kirk True <k...@kirktrue.pro> wrote:

> Hi Ashish,
>
> I'm glad you were able to get it working! :)
>
> Reply to your questions below...
>
> On Mon, Mar 31, 2025, at 10:19 PM, ashish sood wrote:
> > Hi Kirk,
> >
> > I managed to get it running finally.
> >
> >
> > *Server.properties*
> >
> > listeners=SASL_PLAINTEXT://:9093
> > advertised.listeners=SASL_PLAINTEXT://xxxxxx:9093
> > sasl.enabled.mechanisms=OAUTHBEARER
> > sasl.oauthbearer.expected.audience=https://myprovider.com
> >
> oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required;
> > inter.broker.listener.name=SASL_PLAINTEXT
> > sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> >
> listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> >
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
> > sasl.oauthbearer.token.endpoint.url=
> > https://xxxxxxxxxxx.auth0.com/oauth/token
> > sasl.oauthbearer.jwks.endpoint.url=
> > https://xxxxxxxxxxxxx.auth0.com/.well-known/jwks.json
> >
> > *jaas.config*
> > KafkaServer {
> >     org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > required
> >     clientId="xxxxxxxxxxxxxxxxxxxxxx"
> >     clientSecret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
> >     token.endpoint.uri="xxxxxxxxxxxxxxxxxxxauth0.com/oauth/token";
> > };
> >
> > Although I have a conceptual doubt if you could please help. How can we
> > leverage the scope coming in the token ? For example - If the token has
> > scope set to *kafka.read*, the client should have read access to a
> specific
> > topic. Do we need to write a custom class for this OR the existing
> classes
> > (OAuthBearerValidatorCallbackHandler/OAuthBearerLoginCallbackHandler) can
> > help achieve this ?
>
> This would unfortunately require a custom validator callback handler, yes.
>
> > Currently I am using the Client ID in the access token in the ACL to
> allow
> > read/write permissions on topics.
>
> Are you wanting to use Auth0 for both authentication and authorization
> instead of authentication via Auth0 and authorization via ACLs?
>
> Thanks,
> Kirk
>
> >
> > Thanks & Regards
> > Ashish
> >
> > On Tue, Apr 1, 2025 at 6:07 AM Kirk True <k...@kirktrue.pro> wrote:
> >
> > > Hi Ashish,
> > >
> > > In your stack trace I see it's invoking
> > >
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler,
> > > so something in your configuration seems amiss.
> > >
> > > If you can capture the AdminClientConfig output (with sensitive stuff
> > > redacted, obvs), that would be helpful.
> > >
> > > Thanks,
> > > Kirk
> > >
> > > On Thu, Mar 20, 2025, at 3:55 AM, ashish sood wrote:
> > > > Hi Kirk,
> > > >
> > > > Thanks for checking.
> > > >
> > > > I am trying to setup a Kafka cluster with end-to-end oauth (i.e.
> Kafka -
> > > > Kafka communication within a cluster & clients to Kafka broker). I
> was
> > > able
> > > > to get my broker started without errors with below config however I
> am
> > > now
> > > > unable to create topics with below error.
> > > >
> > > > *Current config*
> > > > *jaas.config*
> > > >
> > > > KafkaServer {
> > > >
>  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > > > required
> > > >     clientId="<xxxxxxxx>"
> > > >     clientSecret="<xxxxxxxx>"
> > > >     audience="https://myprovider.com";
> > > >     token.endpoint.uri="https://xxxxxxxx/oauth/token";
> > > >     scope="kafka.read kafka.write";
> > > > };
> > > >
> > > > *server.properties*
> > > > listeners=SASL_PLAINTEXT://:9093
> > > > advertised.listeners=SASL_PLAINTEXT://<>:9093
> > > > sasl.enabled.mechanisms=OAUTHBEARER
> > > > sasl.oauthbearer.expected.audience=https://myprovider.com
> > > >
> > >
> oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > > > required;
> > > > inter.broker.listener.name=SASL_PLAINTEXT
> > > > sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> > > >
> > >
> listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> > > >
> > >
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
> > > > sasl.oauthbearer.token.endpoint.url=https://<xxxxxxxx>/oauth/token
> > > > sasl.oauthbearer.jwks.endpoint.url=https://
> > > <xxxxxxxx>/.well-known/jwks.json
> > > >
> > > > *ERROR WHILE CREATING TOPIC*
> > > >
> > > > This is very strange because when I check the fetch the token
> manually
> > > via
> > > > curl and check it , I clearly see the "sub" field populated with
> value
> > > > <clientid@clients>
> > > >
> > > >
> > > > * ERROR No principal name in JWT claim: sub
> > > >
> > >
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)java.io.IOException:
> > > > No principal name in JWT claim: sub*
> > > >         at
> > > >
> > >
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
> > > >         at
> > > >
> > >
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
> > > >         at
> > > >
> > >
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
> > > >         at
> > > >
> > >
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
> > > >         at
> > > >
> > >
> java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
> > > >         at
> > > >
> > >
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
> > > >         at
> > > >
> > >
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
> > > >         at
> > > >
> > >
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
> > > >         at
> > > >
> > >
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:119)
> > > >         at
> > > >
> > >
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:223)
> > > >         at
> > > >
> > >
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:189)
> > > >         at
> > > >
> > >
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:525)
> > > >         at
> > > >
> > >
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:492)
> > > >         at
> org.apache.kafka.clients.admin.Admin.create(Admin.java:137)
> > > >         at
> > > >
> > >
> org.apache.kafka.tools.TopicCommand$TopicService.createAdminClient(TopicCommand.java:437)
> > > >         at
> > > >
> > >
> org.apache.kafka.tools.TopicCommand$TopicService.<init>(TopicCommand.java:426)
> > > >         at
> > > org.apache.kafka.tools.TopicCommand.execute(TopicCommand.java:98)
> > > >         at
> > > > org.apache.kafka.tools.TopicCommand.mainNoExit(TopicCommand.java:87)
> > > >         at
> org.apache.kafka.tools.TopicCommand.main(TopicCommand.java:82)
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Regards
> > > > Ashish Sood
> > > >
> > > > On Thu, Mar 20, 2025 at 12:15 AM Kirk True <k...@kirktrue.pro>
> wrote:
> > > >
> > > > > Hi Ashish,
> > > > >
> > > > > Are you using OAuth for client->broker communication, inter-broker
> > > > > communication, or both?
> > > > >
> > > > > Based on the server.properties configuration that was shared, it
> looks
> > > > > like the configuration is attempting to set up inter-broker
> > > communication
> > > > > using OAuth.
> > > > >
> > > > > For a broker to *retrieve* tokens , it needs to have this
> > > configuration:
> > > > >
> > > > >
> > > > >
> > >
> listener.name.SASL_PLAINTEXT.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
> > > > >
> > > > > For a broker to *validate* tokens, it needs to have this
> configuration:
> > > > >
> > > > >
> > > > >
> > >
> listener.name.SASL_PLAINTEXT.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
> > > > >
> > > > > Then the SASL configs would need to be included too:
> > > > >
> > > > >
> > >
> listener.name.SASL_PLAINTEXT.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > > > > required \
> > > > >     clientId="XXXXXXXXXXXXXXXXXX"
> > > > >     clientSecret="XXXXXXXXXXXXXXXXXX"
> > > > >     audience="https://myprovider.com";
> > > > >     serviceName="kafka"
> > > > >     scope="kafka.read kafka.write";
> > > > >
> > > > > If possible, please share any non-sensitive logs.
> > > > >
> > > > > Thanks,
> > > > > Kirk
> > > > >
> > > > > On Wed, Mar 19, 2025, at 3:41 AM, ashish sood wrote:
> > > > > > Hi All,
> > > > > >
> > > > > > I am setting up oauth for my Kafka broker. I have set up an
> account
> > > on
> > > > > Auth0 for the same and set up an application and API.
> > > > > >
> > > > > > With the below config in the server.properties and Jaas.config
> file I
> > > > > keep getting invalid token. Although if I generate a manual token
> via
> > > curl
> > > > > it works fine. Also Auth0 logs show successful generation of the
> token,
> > > > > still the Kafka shows error. Any suggestions to resolve this issue
> > > would be
> > > > > appreciated.
> > > > > >
> > > > > > *Server.properties*
> > > > > > listeners=SASL_PLAINTEXT://:9093
> > > > > > advertised.listeners=SASL_PLAINTEXT://<XXXXXX>:9093
> > > > > > sasl.enabled.mechanisms=OAUTHBEARER
> > > > > > sasl.oauthbearer.jwks.endpoint.url=https://XXXXXXXXX/oauth/token
> <
> > > > > https://xxxxxxxxx/oauth/token>
> > > > > > sasl.oauthbearer.expected.audience=https://myprovider.com
> > > > > >
> > > > >
> > >
> oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > > > > required;
> > > > > >
> > > > >
> > >
> listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> > > > > > confluent.oauth.groups.claim.name=groups
> > > > > > inter.broker.listener.name=SASL_PLAINTEXT
> > > > > > sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> > > > > > super.users=User:<ClientID>
> > > > > > sasl.oauthbearer.token.endpoint.url=<XXXXXXXXX>/oauth/token
> > > > > > sasl.oauthbearer.audience=https://myprovider.com
> > > > > > allow.everyone.if.no.acl.found=true
> > > > > > **
> > > > > > *Jaas Config*
> > > > > > KafkaServer {
> > > > > >
> > >  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> > > > > required
> > > > > >     clientId="XXXXXXXXXXXXXXXXXX"
> > > > > >     clientSecret="XXXXXXXXXXXXXXXXXX"
> > > > > >     audience="https://myprovider.com";
> > > > > >     serviceName="kafka"
> > > > > >     scope="kafka.read kafka.write";
> > > > > > };
> > > > > >
> > > > > > *Error*
> > > > > > [2025-03-19 16:05:43,465] INFO [Controller id=0,
> targetBrokerId=0]
> > > > > Failed authentication with localhost/127.0.0.1 (channelId=0)
> > > > > ({"status":"invalid_token"})
> (org.apache.kafka.common.network.Selector)
> > > > > >
> > > > > > image.png
> > > > > >
> > > > > > image.png
> > > > > >
> > > > > > Thanks & Regards
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > ReplyForward
> > > > > >
> > > > > >
> > > > > > Add reaction
> > > > >
> > > >
> > >
> >
>

Reply via email to