@gordon Do you remember whether we changed any behavior of the Kafka
0.10 consumer after 1.3.3?
On 23/01/2020 12:02, Somya Maithani wrote:
Hey,
Any ideas about this? We are blocked on the upgrade because we want
async timer checkpointing.
Regards,
Somya Maithani
Software Developer II
Helpshift Pvt Ltd
On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani
<somyamaithan...@gmail.com <mailto:somyamaithan...@gmail.com>> wrote:
Hey Team,
*Problem*
Recently, we were trying to upgrade Flink infrastructure to
version 1.9.1 and we noticed that a week old offset was consumed
from Kafka even though the configuration says latest.
*Pretext*
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.
*
*
*Release Plan*
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the
serialisation change in checkpointing.
After performing step 1, the service was consuming latest Kafka
events but after performing step 2 we noticed that the service was
consuming one week old Kafka messages from the source topic. We
did not see any exceptions but since the number of messages
consumed increased a lot for our Flink infrastructure, our task
managers started crashing eventually.
We did not change Kafka configuration in the service for the
upgrade but we did upgrade the Flink dependencies for Kafka.
Old dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
New dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
Do we know why this would be happening?
Regards,
Somya Maithani
Software Developer II
Helpshift Pvt Ltd