我们改了权限确实解决了这个问题。但我现在想了解的是为什么Flink在1.16的时候需要创建producerID的权限,以及这个权限是不是针对新老Kafka API都需要的。针对新的Kafka API在精准一次的时候需要管理ProducerID在源码中有体现,但是老的API没看见相关的,只使用了一个ProducerID也需要由Flink内部自己管理吗?
在 2023-02-20 08:45:18,"Shammon FY" <zjur...@gmail.com> 写道: >Hi > >从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: >Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题 > >Best, >Shammon > >On Fri, Feb 17, 2023 at 6:29 PM lxk <lxk7...@163.com> wrote: > >> Flink版本:1.16 >> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: >> 2023-02-17 15:03:19 >> org.apache.kafka.common.KafkaException: Cannot execute transactional >> method because we are in an error state >> at >> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125) >> at >> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442) >> at >> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998) >> at >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912) >> at >> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197) >> at >> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) >> at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: >> Cluster authorization failed. >> >> >> 在了解了相关源码之后,知道KafkaSink这种新的kafka >> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。 >> 但是在使用老的kafka >> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。 >> >> >> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下