Hi Yordan, Indeed it looks like a missing feature. Probably someone implementing the new KafkaSink didn't realize how important this is. I've created a ticket to work on this issue [1], but I don't know when or who could fix it.
I think a workaround might be to create a new `KafkaSink` instance that will have a new, different operator uid, and simply drop/ignore the old instance and its state (by using the `allowNonRestoredState` option [2]). Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-30068 [2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#allowing-non-restored-state śr., 16 lis 2022 o 11:36 Yordan Pavlov <y.d.pav...@gmail.com> napisał(a): > Hi Piotr, > > the option you mention is applicable only for the deprecated > KafkaProducer, is there an equivalent to the modern KafkaSink? I found > this article comparing the behavior of the two: > > https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs > > it suggests that the default behavior of KafkaSink would be: "The > recovery continues with an ERROR message like the following is > logged:", however this is not what I observe, instead the job fails. I > am attaching the relevant part of the log. This error happens upon > trying to recover from a one month old savepoint. > > Regards, > Yordan > > On Tue, 15 Nov 2022 at 18:53, Piotr Nowojski <pnowoj...@apache.org> wrote: > > > > Hi Yordan, > > > > I don't understand where the problem is, why do you think savepoints are > unusable? If you recover with `ignoreFailuresAfterTransactionTimeout` > enabled, the current Flink behaviour shouldn't cause any problems (except > for maybe some logged errors). > > > > Best, > > Piotrek > > > > wt., 15 lis 2022 o 15:36 Yordan Pavlov <y.d.pav...@gmail.com> > napisał(a): > >> > >> Hi, > >> we are using Kafka savepoints as a recovery tool and want to store > >> multiple ones for the past months. However as we use Kafka > >> transactions for our KafkaSink this puts expiration time on our > >> savepoints. We can use a savepoint only as old as our Kafka > >> transaction timeout. The problem is explained in this issue: > >> https://issues.apache.org/jira/browse/FLINK-16419 > >> the relative comment being this one: > >> "FlinkKafkaProducer or KafkaSink do not know during recovery if they > >> have to recover and commit or if it has already happened. Due to that, > >> they are always attempting to recover and commit transactions during > >> startup." > >> I'm surprised that more people are not hitting this problem as this > >> makes Savepoints pretty much unusable as a recovery mechanism. >