I keep on receiving this exception during the execution of a simple job that 
receives time series data via Kafka, transforms it into avro format, and then 
sends into a Kafka topic consumed by druid.
Any advise would be appreciated as to how to resolve this type of error. 

I'm using Apache Kafka 2.6,2 via Amazon MSK and Flink 1.12.1.

private Properties buildKafkaProducerConfigProperties() {
   final Properties properties = new Properties();
   properties.setProperty("bootstrap.servers", String.join(",", 
druidProducerKafkaBootstrapServers));
   if (druidProducerZookeeperConnect != null) {
      properties.setProperty("zookeeper.connect", 
druidProducerZookeeperConnect);
   }
   if (druidProducerTransactionMaxTimeoutMs > 0) {
      properties.setProperty("transaction.max.timeout.ms", 
Integer.toString(druidProducerTransactionMaxTimeoutMs));
   }
   if (druidProducerTransactionTimeoutMs > 0) {
      properties.setProperty("transaction.timeout.ms", 
Integer.toString(druidProducerTransactionTimeoutMs));
   }

   properties.setProperty("transactional.id", "local.druid");

   if (druidProducerKafkaRequiresSsl) {
      properties.setProperty("ssl.truststore.location", 
kafkaSslTruststoreLocation);
      properties.setProperty("ssl.truststore.password", 
kafkaSslTruststorePassword);
      properties.setProperty("security.protocol", "SSL");
   }

   properties.setProperty("offsets.topic.replication.factor", "3");
   properties.setProperty("transaction.state.log.replication.factor", "1");

   return properties;
}

private void addToDruidProducerSink(DataStream<SensorMessage> sensorLineStream) 
{
   final Properties producerProperties = buildKafkaProducerConfigProperties();
   FlinkKafkaProducer<SensorMessage> producerTopic = new FlinkKafkaProducer<>(
         druidProducerKafkaTopic,
         new 
DruidSensorMessageKafkaSerializationSchema(druidProducerKafkaTopic),
         producerProperties,
         FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

   final DataStreamSink<SensorMessage> producerSink = 
sensorLineStream.addSink(producerTopic);

   // workaround for producer transcation id failures is to give producer a 
unique name.
   // https://issues.apache.org/jira/browse/FLINK-11654
   producerSink
         .name("druid sink " + druidProducerKafkaGroupId)
         .uid("druid-sink-" + druidProducerKafkaGroupId);
}



2022-02-22 14:41:37
java.io.IOException: Could not perform checkpoint 1 for operator split -> Sink: 
druid sink producer.local.druid (1/1)#0.
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:968)
    at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
    at 
org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156)
    at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:178)
    at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: The producer has been rejected from the broker because it tried 
to use an old epoch with the transactionalId
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
        ... 3 more
    Caused by: org.apache.kafka.common.errors.ProducerFencedException: The 
producer has been rejected from the broker because it tried to use an old epoch 
with the transactionalId
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 1 for operator split -> Sink: druid sink producer.local.druid 
(1/1)#0. Failure reason: Checkpoint was declined.
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:951)
    ... 13 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: Producer attempted an operation with an old 
epoch. Either there is a newer producer with the same transactionalId, or the 
producer's transaction has been expired by the broker.
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1002)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
    at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:320)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1100)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
    ... 23 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.

Reply via email to