>
> I then noticed this message showing up twice and thought "this does not
> look right":

That's fine, this is how the sink works (see the comment here:
KafkaWriter.java#L294-L301
<https://github.com/apache/flink/blob/4f233f86431ec14c807343c77cb47406fba718ae/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L294-L301>
 )

There are timeouts on the Kafka side that purge the transactions if they do
not finish on time. The default value is 15 minutes, which also matches the
interval between the log entries you provided. Try to:
1) increase the timeouts
2) check that you checkpoint reasonably frequently in relation to the
existing timeout and that your checkpoints actually complete fast enough
3) make sure that your job is operating correctly without being
backpressured. If you see backpressure in Flink UI frequently, try to
locate the bottleneck, and in the meantime check if enabling unaligned
checkpoints helps.

Here are some useful links:
https://bit.ly/3wVgZKk
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#caveats
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance
https://youtu.be/bhcFfS1-eDY?t=410
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints

Best,
Alexander Fedulov


On Mon, Sep 5, 2022 at 2:21 PM Sebastian Struss <str...@justtrack.io> wrote:

> Hi all,
>
> i am quite new to flink and kafka, so i might mix something up here.
> The situation is that we do have a flink application (1.14.5 with scala
> 2.12) running for a few hours to days and suddenly it stops working and
> can't publish to kafka anymore.
> I then noticed this message showing up twice and thought "this does not
> look right":
> > Created new transactional producer prefix-2-9447
> The second message timestamp seems to be the timestamp when the
> application doesn't publish to kafka properly anymore and when checkpoints
> are failing to be made.
> We also see this error message:
> > Producer attempted an operation with an old epoch. Either there is a
> newer producer with the same transactionalId, or the producer's transaction
> has been expired by the broker.
> Am i mistaken when i think that this should be impossible when flink
> handles the sinks?
> I would think that due to the checkpointing and due to us giving flink the
> control about the output, it should never run into this situation.
> We are using an exactly once delivery garantee for kafka and set the flink
> sink parallelism to 4.
> Also we are using the kubernetes operator of flink in version 1.1.0.
> Any hints on what to check/change are highly appreciated.
>
> best,
> Sebastian S.
> [image: image.png]
>

Reply via email to