Yes, that might be an issue. As far as I remember, the universal connector works with Kafka 0.10.x or higher.
Piotrek pt., 5 mar 2021 o 11:20 Witzany, Tomas <tomas.witz...@blindspot.ai> napisał(a): > Hi, > thanks for your answer. It seems like it will not be possible for me to > upgrade to the newer universal Flink producer, because of an older Kafka > version I am reading from. So unfortunately for now I will have to go with > the hack. > Thanks > ------------------------------ > *From:* Piotr Nowojski <pnowoj...@apache.org> > *Sent:* 03 March 2021 21:10 > *To:* Witzany, Tomas <tomas.witz...@blindspot.ai> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Flink KafkaProducer flushing on savepoints > > Hi, > > What Flink version and which FlinkKafkaProducer version are you using? > `FlinkKafkaProducerBase` is no longer used in the latest version. I would > guess some older versions, and FlinkKafkaProducer010 or later (no longer > supported). > > I would suggest either to use the universal FlinkKafkaProducer > (universal), or FliknKafkaProducer011 (if you are using a really old Flink > version that doesn't have the universal Kafka connector). Both of those > should work with any Kafka version and by looking at the code it seems to > me like neither of those has the problem you mentioned. If you select > `org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE` > and disable checkpointing it should be still flushing records on savepoints. > > > The only thing I can think about is have checkpoints enabled with some > very high periodicity so that they are never(almost) triggered. But this is > a hack. > > Yes, it would be a hack. But it would work. > > Best, > Piotrek > > wt., 2 mar 2021 o 12:09 Witzany, Tomas <tomas.witz...@blindspot.ai> > napisał(a): > > Hi, > I have a question about the at-least-once guarantees for Kafka producers > when checkpointing is disabled. In our data pipeline we have a Flink job on > an unlimited stream that originally, we had checkpoints turned on. Further > this job is cancelled with a savepoint once a day to do some data pre and > post-processing for the next day, afterwards this job is restarted from the > savepoint. > > The issue we have is that we want to turn off checkpointing, since it > does not give us much value and only creates extra IO. When this is done this > message > <https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L263> > shows up: > "Flushing on checkpoint is enabled, but checkpointing is not enabled. > Disabling flushing." > This prompted us to investigate, and it seems that if you have > checkpointing disabled, there are no at-least-once guarantees. > <https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java#L65> > > What about if you have no checkpointing, but you make savepoints that you > restore from yourself? Savepoints are the same thing as checkpoints in the > code. The flink producer makes it impossible to turn on flushing and have > checkpointing disabled. I can see why this is the case as there is some > extra synchronization overhead related to the flushing flag being on. Is > there a way to have checkpointing disabled and have at least once > guarantees on savepoints? > > The only thing I can think about is have checkpoints enabled with some > very high periodicity so that they are never(almost) triggered. But this is > a hack. > > Tomas Witzany > >