Hi Morgan, You could check FLINK-11654, from its description, I think it is the problem you encountered.
> We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle. FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654 Best, Kezhu Wang On February 28, 2021 at 22:35:02, Morgan Geldenhuys ( [email protected]) wrote: Greetings all, I am having an issue instantiating multiple flink jobs uisng the same JAR in the same Flink native cluster (all 1.12.1). When processing events, the jobs fail with the following trace: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager .failIfNotReadyForSend(TransactionManager.java:356) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer .java:926) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer .java:865) at org.apache.flink.streaming.connectors.kafka.internals. FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .invoke(FlinkKafkaProducer.java:915) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .invoke(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink. TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) at org.apache.flink.streaming.api.operators.StreamSink.processElement( StreamSink.java:54) at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask .java:187) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Unknown Source) Suppressed: org.apache.flink.streaming.connectors.kafka. FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .checkErroneous(FlinkKafkaProducer.java:1392) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .close(FlinkKafkaProducer.java:965) at org.apache.flink.api.common.functions.util.FunctionUtils .closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators. AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask .disposeAllOperators(StreamTask.java:783) at org.apache.flink.streaming.runtime.tasks.StreamTask .runAndSuppressThrowable(StreamTask.java:762) at org.apache.flink.streaming.runtime.tasks.StreamTask .cleanUpInvoke(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( StreamTask.java:585) ... 3 more Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. Suppressed: java.lang.IllegalStateException: Pending record count must be zero at this point: 1 at org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090) at org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) at org.apache.flink.api.common.functions.util.FunctionUtils .closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators. AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask .disposeAllOperators(StreamTask.java:783) at org.apache.flink.streaming.runtime.tasks.StreamTask .runAndSuppressThrowable(StreamTask.java:762) at org.apache.flink.streaming.runtime.tasks.StreamTask .cleanUpInvoke(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( StreamTask.java:585) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755 ) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Unknown Source) Suppressed: java.lang.IllegalStateException: Pending record count must be zero at this point: 1 at org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090) at org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) at org.apache.flink.api.common.functions.util.FunctionUtils .closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators. AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask .disposeAllOperators(StreamTask.java:783) at org.apache.flink.streaming.runtime.tasks.StreamTask .runAndSuppressThrowable(StreamTask.java:762) at org.apache.flink.streaming.runtime.tasks.StreamTask .cleanUpInvoke(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( StreamTask.java:585) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755 ) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Unknown Source) Concerning configurations, I have set the transaction.max.timeout.ms on the Kafka server to one hour as advised here https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance. Additionally in the producer TRANSACTIONAL_ID_CONFIG variable to random i.e. UUID.randomUUID().toString() Any ideas of why this would be the case? Regards, Morgan.
