[ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826229#comment-16826229
 ] 

koert kuipers commented on KAFKA-7631:
--------------------------------------

i think i ran into this. brokers are kafka 2.2.0.

my brokers use GSSAPI/kerberos, but i have also have SCRAM enabled for clients 
that use delegation tokens:
 sasl.mechanism.inter.broker.protocol=GSSAPI
 sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512

my jaas.conf for brokers has com.sun.security.auth.module.Krb5LoginModule for 
KafkaClient

kafka server log shows:
{code}
[2019-04-25 12:23:48,108] WARN [SocketServer brokerId=xx] Unexpected error from 
/x.x.x.x; closing connection (org.apache.kafka.common.network.Selector)
java.lang.NullPointerException
    at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:450)
    at 
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:290)
    at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:536)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:472)
    at kafka.network.Processor.poll(SocketServer.scala:830)
    at kafka.network.Processor.run(SocketServer.scala:730)
    at java.lang.Thread.run(Thread.java:748)
 {code}
my client is spark structured streaming driver, which in spark 3 has kafka 
delegation support, which is what i am testing. i see here:
{code}
2019-04-25 12:23:48 DEBUG 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set 
SASL client state to SEND_HANDSHAKE_REQUEST
2019-04-25 12:23:48 DEBUG 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set 
SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2019-04-25 12:23:48 DEBUG 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set 
SASL client state to INITIAL
2019-04-25 12:23:48 DEBUG 
org.apache.kafka.common.security.scram.internals.ScramSaslClient: Setting 
SASL/SCRAM_SHA_512 client state to RECEIVE_SERVER_FIRST_MESSAGE
2019-04-25 12:23:48 DEBUG 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set 
SASL client state to INTERMEDIATE
2019-04-25 12:23:48 DEBUG org.apache.kafka.common.network.Selector: [Consumer 
clientId=x, groupId=x] Connection with x/x.x.x.x disconnected
java.io.EOFException
        at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:407)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:497)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveToken(SaslClientAuthenticator.java:435)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:259)
        at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
        at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:536)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:472)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1195)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$2(KafkaOffsetReader.scala:217)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:358)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:357)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$1(KafkaOffsetReader.scala:215)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:325)
        at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:215)
        at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:198)
        at scala.Option.getOrElse(Option.scala:138)
        at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:193)
        at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:81)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$6(MicroBatchExecution.scala:365)
        at scala.Option.getOrElse(Option.scala:138)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:365)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:327)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:325)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:362)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:237)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:354)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:577)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:350)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:195)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:327)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:325)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:178)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:172)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:331)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:243)
{code}

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7631
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7631
>             Project: Kafka
>          Issue Type: Improvement
>          Components: security
>    Affects Versions: 2.0.0
>            Reporter: Andras Beni
>            Assignee: Viktor Somogyi-Vass
>            Priority: Minor
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
>         at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
>         at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
>         at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
>         at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
>         at kafka.network.Processor.poll(SocketServer.scala:679)
>         at kafka.network.Processor.run(SocketServer.scala:584)
>         at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to