Hi Rado,

it is hard to tell the reason w/o a bit more details. Could you share with
us the complete logs of the problematic run? Also the job you are
running and the types of the state you are storing in RocksDB and use as
events in your job are very important. In the linked SO question, the
problem was a type whose hashcode was not immutable.

Cheers,
Till

On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
radoslav.smilya...@smule.com> wrote:

> Hello all,
>
> I am running a Flink job that performs data enrichment. My job has 7 kafka
> consumers that receive messages for dml statements performed for 7 db
> tables.
>
> Job setup:
>
>    - Flink is run in k8s in a similar way as it is described here
>    
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>    .
>    - 1 job manager and 2 task managers
>    - parallelism is set to 4 and 2 task slots
>    - rocksdb as state backend
>    - protobuf for serialization
>
> Whenever I try to trigger a savepoint after my state is bootstrapped I get
> the following error for different operators:
>
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>
> Note: key group might vary.
>
> I found this
> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange>
>  article
> in Stackoverflow which relates to such an exception (btw my job graph looks
> similar to the one described in the article except that my job has more
> joins). I double checked my hashcodes and I think that they are fine.
>
> I tried to reduce the parallelism to 1 with 1 task slot per task manager
> and this configuration seems to work. This leads me to a direction that it
> might be some concurrency issue.
>
> I would like to understand what is causing the savepoint failure. Do you
> have any suggestions what I might be missing?
>
> Thanks in advance!
>
> Best Regards,
> Rado
>

Reply via email to