Hi all, I'm trying to deploy a FlinkKafkaProducer in PyFlink on a remote cluster. Unfortunately, I'm getting the following exception:
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common. KafkaException: Failed to construct kafka producer at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.<init>(KafkaProducer.java:432) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.<init>(KafkaProducer.java:298) at org.apache.flink.streaming.connectors.kafka.internals. FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .createProducer(FlinkKafkaProducer.java:1230) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .initProducer(FlinkKafkaProducer.java:1346) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .initNonTransactionalProducer(FlinkKafkaProducer.java:1342) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .beginTransaction(FlinkKafkaProducer.java:990) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .beginTransaction(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink. TwoPhaseCommitSinkFunction.beginTransactionInternal( TwoPhaseCommitSinkFunction.java:403) at org.apache.flink.streaming.api.functions.sink. TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java: 394) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .initializeState(FlinkKafkaProducer.java:1195) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .tryRestoreFunction(StreamingFunctionUtils.java:189) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .restoreFunctionState(StreamingFunctionUtils.java:171) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator .initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler .initializeOperatorState(StreamOperatorStateHandler.java:118) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:290) at org.apache.flink.streaming.runtime.tasks.OperatorChain .initializeStateAndOpenOperators(OperatorChain.java:436) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( StreamTask.java:574) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( StreamTask.java:554) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common. KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. SaslChannelBuilder.configure(SaslChannelBuilder.java:158) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. ChannelBuilders.create(ChannelBuilders.java:146) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils .createChannelBuilder(ClientUtils.java:99) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.newSender(KafkaProducer.java:450) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.<init>(KafkaProducer.java:421) ... 22 more Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) at javax.security.auth.login.LoginContext.access$000(LoginContext.java: 195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java: 680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator. AbstractLogin.login(AbstractLogin.java:60) at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator. LoginManager.<init>(LoginManager.java:62) at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator. LoginManager.acquireLoginManager(LoginManager.java:105) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. SaslChannelBuilder.configure(SaslChannelBuilder.java:147) ... 27 more My Kafka (producer) configuration looks like this: producer_config = { "bootstrap.servers": "URL", "sasl.mechanism": "PLAIN", "security.protocol": "SASL_SSL", "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASS\";", "ssl.endpoint.identification.algorithm": "https" } I configured my Flink cluster with: security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false security.kerberos.fetch.delegation-token: false Any ideas? Thanks in advance, Wouter