> > I then noticed this message showing up twice and thought "this does not > look right":
That's fine, this is how the sink works (see the comment here: KafkaWriter.java#L294-L301 <https://github.com/apache/flink/blob/4f233f86431ec14c807343c77cb47406fba718ae/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L294-L301> ) There are timeouts on the Kafka side that purge the transactions if they do not finish on time. The default value is 15 minutes, which also matches the interval between the log entries you provided. Try to: 1) increase the timeouts 2) check that you checkpoint reasonably frequently in relation to the existing timeout and that your checkpoints actually complete fast enough 3) make sure that your job is operating correctly without being backpressured. If you see backpressure in Flink UI frequently, try to locate the bottleneck, and in the meantime check if enabling unaligned checkpoints helps. Here are some useful links: https://bit.ly/3wVgZKk https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#caveats https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance https://youtu.be/bhcFfS1-eDY?t=410 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints Best, Alexander Fedulov On Mon, Sep 5, 2022 at 2:21 PM Sebastian Struss <str...@justtrack.io> wrote: > Hi all, > > i am quite new to flink and kafka, so i might mix something up here. > The situation is that we do have a flink application (1.14.5 with scala > 2.12) running for a few hours to days and suddenly it stops working and > can't publish to kafka anymore. > I then noticed this message showing up twice and thought "this does not > look right": > > Created new transactional producer prefix-2-9447 > The second message timestamp seems to be the timestamp when the > application doesn't publish to kafka properly anymore and when checkpoints > are failing to be made. > We also see this error message: > > Producer attempted an operation with an old epoch. Either there is a > newer producer with the same transactionalId, or the producer's transaction > has been expired by the broker. > Am i mistaken when i think that this should be impossible when flink > handles the sinks? > I would think that due to the checkpointing and due to us giving flink the > control about the output, it should never run into this situation. > We are using an exactly once delivery garantee for kafka and set the flink > sink parallelism to 4. > Also we are using the kubernetes operator of flink in version 1.1.0. > Any hints on what to check/change are highly appreciated. > > best, > Sebastian S. > [image: image.png] >