I keep on receiving this exception during the execution of a simple job that receives time series data via Kafka, transforms it into avro format, and then sends into a Kafka topic consumed by druid. Any advise would be appreciated as to how to resolve this type of error.
I'm using Apache Kafka 2.6,2 via Amazon MSK and Flink 1.12.1. private Properties buildKafkaProducerConfigProperties() { final Properties properties = new Properties(); properties.setProperty("bootstrap.servers", String.join(",", druidProducerKafkaBootstrapServers)); if (druidProducerZookeeperConnect != null) { properties.setProperty("zookeeper.connect", druidProducerZookeeperConnect); } if (druidProducerTransactionMaxTimeoutMs > 0) { properties.setProperty("transaction.max.timeout.ms", Integer.toString(druidProducerTransactionMaxTimeoutMs)); } if (druidProducerTransactionTimeoutMs > 0) { properties.setProperty("transaction.timeout.ms", Integer.toString(druidProducerTransactionTimeoutMs)); } properties.setProperty("transactional.id", "local.druid"); if (druidProducerKafkaRequiresSsl) { properties.setProperty("ssl.truststore.location", kafkaSslTruststoreLocation); properties.setProperty("ssl.truststore.password", kafkaSslTruststorePassword); properties.setProperty("security.protocol", "SSL"); } properties.setProperty("offsets.topic.replication.factor", "3"); properties.setProperty("transaction.state.log.replication.factor", "1"); return properties; } private void addToDruidProducerSink(DataStream<SensorMessage> sensorLineStream) { final Properties producerProperties = buildKafkaProducerConfigProperties(); FlinkKafkaProducer<SensorMessage> producerTopic = new FlinkKafkaProducer<>( druidProducerKafkaTopic, new DruidSensorMessageKafkaSerializationSchema(druidProducerKafkaTopic), producerProperties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); final DataStreamSink<SensorMessage> producerSink = sensorLineStream.addSink(producerTopic); // workaround for producer transcation id failures is to give producer a unique name. // https://issues.apache.org/jira/browse/FLINK-11654 producerSink .name("druid sink " + druidProducerKafkaGroupId) .uid("druid-sink-" + druidProducerKafkaGroupId); } 2022-02-22 14:41:37 java.io.IOException: Could not perform checkpoint 1 for operator split -> Sink: druid sink producer.local.druid (1/1)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:968) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115) at org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:178) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Thread.java:834) Suppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) ... 3 more Caused by: org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator split -> Sink: druid sink producer.local.druid (1/1)#0. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:951) ... 13 more Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) ... 23 more Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.