Hi, Sorry for the late reply but most of use were involved in the Flink Forward conference. The upgrade strategies for the Kafka sink and source are pretty similar. Source and sink do not rely on state migration but leveraging Kafka as source of truth.
When running with FlinkKafkaConsumer Mason pointed out correctly that you have stop the job with a save point and set `setCommittedOffsetsOnCheckpoint(true)` [1]. For the FlinkKafkaProducer it is similar on a final savepoint the producer will finalize all pending transactions and submit them to Kafka. The KafkaSink can start without the need of any state migration because there should not be any pending transactions anymore. I do not think you must use `allowNonRestoredState` because there shouldn’t be any state anymore after stopping with a savepoint the source or sink. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version