Hi Dan, Can you provide us with more information about your job (maybe even the job code or a minimally working example), the Flink configuration, the exact workflow you are doing and the corresponding logs and error messages?
Cheers, Till On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <quietgol...@gmail.com> wrote: > Could this be caused by mixing of configuration settings when running? > Running a job with one parallelism, stop/savepointing and then recovering > with a different parallelism? I'd assume that's fine and wouldn't put > create bad state. > > On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <quietgol...@gmail.com> wrote: > >> I checked my code. Our keys for streams and map state only use either >> (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2. >> >> I don't know why my current case is breaking. Our job partitions and >> parallelism settings have not changed. >> >> >> >> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <quietgol...@gmail.com> wrote: >> >>> Hey. I just hit a similar error in production when trying to >>> savepoint. We also use protobufs. >>> >>> Has anyone found a better fix to this? >>> >>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Glad to hear that you solved your problem. Afaik Flink should not read >>>> the fields of messages and call hashCode on them. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov < >>>> radoslav.smilya...@smule.com> wrote: >>>> >>>>> Hi Till, >>>>> >>>>> I found my problem. It was indeed related to a mutable hashcode. >>>>> >>>>> I was using a protobuf message in the key selector function and one of >>>>> the protobuf fields was enum. I checked the implementation of the hashcode >>>>> of the generated message and it is using the int value field of the >>>>> protobuf message so I assumed that it is ok and it's immutable. >>>>> >>>>> I replaced the key selector function to use Tuple[Long, Int] (since my >>>>> protobuf message has only these two fields where the int parameter stands >>>>> for the enum value field). After changing my code to use the Tuple it >>>>> worked. >>>>> >>>>> I am not sure if Flink somehow reads the protobuf message fields and >>>>> uses the hashcode of the fields directly since the generated protobuf enum >>>>> indeed has a mutable hashcode (Enum.hashcode). >>>>> >>>>> Nevertheless it's ok with the Tuple key. >>>>> >>>>> Thanks for your response! >>>>> >>>>> Best Regards, >>>>> Rado >>>>> >>>>> >>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Rado, >>>>>> >>>>>> it is hard to tell the reason w/o a bit more details. Could you share >>>>>> with us the complete logs of the problematic run? Also the job you are >>>>>> running and the types of the state you are storing in RocksDB and use as >>>>>> events in your job are very important. In the linked SO question, the >>>>>> problem was a type whose hashcode was not immutable. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov < >>>>>> radoslav.smilya...@smule.com> wrote: >>>>>> >>>>>>> Hello all, >>>>>>> >>>>>>> I am running a Flink job that performs data enrichment. My job has 7 >>>>>>> kafka consumers that receive messages for dml statements performed for >>>>>>> 7 db >>>>>>> tables. >>>>>>> >>>>>>> Job setup: >>>>>>> >>>>>>> - Flink is run in k8s in a similar way as it is described here >>>>>>> >>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions> >>>>>>> . >>>>>>> - 1 job manager and 2 task managers >>>>>>> - parallelism is set to 4 and 2 task slots >>>>>>> - rocksdb as state backend >>>>>>> - protobuf for serialization >>>>>>> >>>>>>> Whenever I try to trigger a savepoint after my state is >>>>>>> bootstrapped I get the following error for different operators: >>>>>>> >>>>>>> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in >>>>>>> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}. >>>>>>> at >>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) >>>>>>> at >>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) >>>>>>> at >>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319) >>>>>>> at >>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261) >>>>>>> >>>>>>> Note: key group might vary. >>>>>>> >>>>>>> I found this >>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange> >>>>>>> article >>>>>>> in Stackoverflow which relates to such an exception (btw my job graph >>>>>>> looks >>>>>>> similar to the one described in the article except that my job has more >>>>>>> joins). I double checked my hashcodes and I think that they are fine. >>>>>>> >>>>>>> I tried to reduce the parallelism to 1 with 1 task slot per task >>>>>>> manager and this configuration seems to work. This leads me to a >>>>>>> direction >>>>>>> that it might be some concurrency issue. >>>>>>> >>>>>>> I would like to understand what is causing the savepoint failure. Do >>>>>>> you have any suggestions what I might be missing? >>>>>>> >>>>>>> Thanks in advance! >>>>>>> >>>>>>> Best Regards, >>>>>>> Rado >>>>>>> >>>>>>