Hey Team, *Problem* Recently, we were trying to upgrade Flink infrastructure to version 1.9.1 and we noticed that a week old offset was consumed from Kafka even though the configuration says latest.
*Pretext* 1. Our current Flink version in production is 1.2.1. 2. We use RocksDB + Hadoop as our backend / checkpointing data store. 3. We consume and produce messages to / from Kafka. *Release Plan* 1. Upgrade Flink 1.2.1 to 1.3. 2. Upgrade Flink 1.3.3 to 1.9.1 Note: We have a transitioning version (1.3.3) because of the serialisation change in checkpointing. After performing step 1, the service was consuming latest Kafka events but after performing step 2 we noticed that the service was consuming one week old Kafka messages from the source topic. We did not see any exceptions but since the number of messages consumed increased a lot for our Flink infrastructure, our task managers started crashing eventually. We did not change Kafka configuration in the service for the upgrade but we did upgrade the Flink dependencies for Kafka. Old dependency: <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.10</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.10</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.10_2.10</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-statebackend-rocksdb_2.10</artifactId> > <version>${flink.version}</version> > </dependency> > New dependency: <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.10_2.11</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-statebackend-rocksdb_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > Do we know why this would be happening? Regards, Somya Maithani Software Developer II Helpshift Pvt Ltd