Hi Dan,

Can you provide us with more information about your job (maybe even the job
code or a minimally working example), the Flink configuration, the exact
workflow you are doing and the corresponding logs and error messages?

Cheers,
Till

On Tue, Jul 13, 2021 at 9:39 PM Dan Hill <quietgol...@gmail.com> wrote:

> Could this be caused by mixing of configuration settings when running?
> Running a job with one parallelism, stop/savepointing and then recovering
> with a different parallelism?  I'd assume that's fine and wouldn't put
> create bad state.
>
> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> I checked my code.  Our keys for streams and map state only use either
>> (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2.
>>
>> I don't know why my current case is breaking.  Our job partitions and
>> parallelism settings have not changed.
>>
>>
>>
>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill <quietgol...@gmail.com> wrote:
>>
>>> Hey.  I just hit a similar error in production when trying to
>>> savepoint.  We also use protobufs.
>>>
>>> Has anyone found a better fix to this?
>>>
>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Glad to hear that you solved your problem. Afaik Flink should not read
>>>> the fields of messages and call hashCode on them.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>> radoslav.smilya...@smule.com> wrote:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> I found my problem. It was indeed related to a mutable hashcode.
>>>>>
>>>>> I was using a protobuf message in the key selector function and one of
>>>>> the protobuf fields was enum. I checked the implementation of the hashcode
>>>>> of the generated message and it is using the int value field of the
>>>>> protobuf message so I assumed that it is ok and it's immutable.
>>>>>
>>>>> I replaced the key selector function to use Tuple[Long, Int] (since my
>>>>> protobuf message has only these two fields where the int parameter stands
>>>>> for the enum value field). After changing my code to use the Tuple it
>>>>> worked.
>>>>>
>>>>> I am not sure if Flink somehow reads the protobuf message fields and
>>>>> uses the hashcode of the fields directly since the generated protobuf enum
>>>>> indeed has a mutable hashcode (Enum.hashcode).
>>>>>
>>>>> Nevertheless it's ok with the Tuple key.
>>>>>
>>>>> Thanks for your response!
>>>>>
>>>>> Best Regards,
>>>>> Rado
>>>>>
>>>>>
>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Rado,
>>>>>>
>>>>>> it is hard to tell the reason w/o a bit more details. Could you share
>>>>>> with us the complete logs of the problematic run? Also the job you are
>>>>>> running and the types of the state you are storing in RocksDB and use as
>>>>>> events in your job are very important. In the linked SO question, the
>>>>>> problem was a type whose hashcode was not immutable.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>> radoslav.smilya...@smule.com> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I am running a Flink job that performs data enrichment. My job has 7
>>>>>>> kafka consumers that receive messages for dml statements performed for 
>>>>>>> 7 db
>>>>>>> tables.
>>>>>>>
>>>>>>> Job setup:
>>>>>>>
>>>>>>>    - Flink is run in k8s in a similar way as it is described here
>>>>>>>    
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>    .
>>>>>>>    - 1 job manager and 2 task managers
>>>>>>>    - parallelism is set to 4 and 2 task slots
>>>>>>>    - rocksdb as state backend
>>>>>>>    - protobuf for serialization
>>>>>>>
>>>>>>> Whenever I try to trigger a savepoint after my state is
>>>>>>> bootstrapped I get the following error for different operators:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
>>>>>>> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>>>>>> at
>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>>>>>> at
>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>>>>>
>>>>>>> Note: key group might vary.
>>>>>>>
>>>>>>> I found this
>>>>>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange>
>>>>>>>  article
>>>>>>> in Stackoverflow which relates to such an exception (btw my job graph 
>>>>>>> looks
>>>>>>> similar to the one described in the article except that my job has more
>>>>>>> joins). I double checked my hashcodes and I think that they are fine.
>>>>>>>
>>>>>>> I tried to reduce the parallelism to 1 with 1 task slot per task
>>>>>>> manager and this configuration seems to work. This leads me to a 
>>>>>>> direction
>>>>>>> that it might be some concurrency issue.
>>>>>>>
>>>>>>> I would like to understand what is causing the savepoint failure. Do
>>>>>>> you have any suggestions what I might be missing?
>>>>>>>
>>>>>>> Thanks in advance!
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Rado
>>>>>>>
>>>>>>

Reply via email to