Hello all,

I am running a Flink job that performs data enrichment. My job has 7 kafka
consumers that receive messages for dml statements performed for 7 db
tables.

Job setup:

   - Flink is run in k8s in a similar way as it is described here
   
<https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
   .
   - 1 job manager and 2 task managers
   - parallelism is set to 4 and 2 task slots
   - rocksdb as state backend
   - protobuf for serialization

Whenever I try to trigger a savepoint after my state is bootstrapped I get
the following error for different operators:

Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)

Note: key group might vary.

I found this
<https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange>
article
in Stackoverflow which relates to such an exception (btw my job graph looks
similar to the one described in the article except that my job has more
joins). I double checked my hashcodes and I think that they are fine.

I tried to reduce the parallelism to 1 with 1 task slot per task manager
and this configuration seems to work. This leads me to a direction that it
might be some concurrency issue.

I would like to understand what is causing the savepoint failure. Do you
have any suggestions what I might be missing?

Thanks in advance!

Best Regards,
Rado

Reply via email to