Hi all,

I'm trying to deploy a FlinkKafkaProducer in PyFlink on a remote cluster.
Unfortunately, I'm getting the following exception:

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.
KafkaException: Failed to construct kafka producer
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.streaming.connectors.kafka.internals.
FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.createProducer(FlinkKafkaProducer.java:1230)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initProducer(FlinkKafkaProducer.java:1346)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.beginTransaction(FlinkKafkaProducer.java:990)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.beginTransaction(FlinkKafkaProducer.java:99)
    at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.beginTransactionInternal(
TwoPhaseCommitSinkFunction.java:403)
    at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:
394)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initializeState(FlinkKafkaProducer.java:1195)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.tryRestoreFunction(StreamingFunctionUtils.java:189)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.restoreFunctionState(StreamingFunctionUtils.java:171)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.initializeOperatorState(StreamOperatorStateHandler.java:118)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:290)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:436)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:574)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
StreamTask.java:554)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.
KafkaException: javax.security.auth.login.LoginException: unable to find
LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
ChannelBuilders.create(ChannelBuilders.java:146)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils
.createChannelBuilder(ClientUtils.java:99)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.newSender(KafkaProducer.java:450)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.<init>(KafkaProducer.java:421)
    ... 22 more
Caused by: javax.security.auth.login.LoginException: unable to find
LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:
195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:
680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
AbstractLogin.login(AbstractLogin.java:60)
    at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
LoginManager.<init>(LoginManager.java:62)
    at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
LoginManager.acquireLoginManager(LoginManager.java:105)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
    ... 27 more

My Kafka (producer) configuration looks like this:

producer_config = {
    "bootstrap.servers": "URL",
    "sasl.mechanism": "PLAIN",
    "security.protocol": "SASL_SSL",
    "sasl.jaas.config":
"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"USERNAME\" password=\"PASS\";",
    "ssl.endpoint.identification.algorithm": "https"
}

I configured my Flink cluster with:
    security.kerberos.login.contexts: Client,KafkaClient
  security.kerberos.login.use-ticket-cache: false
security.kerberos.fetch.delegation-token: false

Any ideas?

Thanks in advance,
Wouter

Reply via email to