Hello all, I am running a Flink job that performs data enrichment. My job has 7 kafka consumers that receive messages for dml statements performed for 7 db tables.
Job setup: - Flink is run in k8s in a similar way as it is described here <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions> . - 1 job manager and 2 task managers - parallelism is set to 4 and 2 task slots - rocksdb as state backend - protobuf for serialization Whenever I try to trigger a savepoint after my state is bootstrapped I get the following error for different operators: Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}. at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261) Note: key group might vary. I found this <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> article in Stackoverflow which relates to such an exception (btw my job graph looks similar to the one described in the article except that my job has more joins). I double checked my hashcodes and I think that they are fine. I tried to reduce the parallelism to 1 with 1 task slot per task manager and this configuration seems to work. This leads me to a direction that it might be some concurrency issue. I would like to understand what is causing the savepoint failure. Do you have any suggestions what I might be missing? Thanks in advance! Best Regards, Rado