[ https://issues.apache.org/jira/browse/FLINK-29430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huang Xingbo updated FLINK-29430: --------------------------------- Fix Version/s: 1.17.0 (was: 1.16.0) > Sanity check in InternalKeyContextImpl#setCurrentKeyGroupIndex > -------------------------------------------------------------- > > Key: FLINK-29430 > URL: https://issues.apache.org/jira/browse/FLINK-29430 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.15.2 > Reporter: Zakelly Lan > Assignee: Zakelly Lan > Priority: Major > Fix For: 1.17.0 > > > Currently the HeapStateBackend check whether the current key group index is a > valid one while the RocksDBStateBackend will not. When using > HeapStateBackend, if the user uses a non-deterministic shuffle key, an > exception is thrown as follows: > > {code:java} > java.lang.IllegalArgumentException: Key group 84 is not in > KeyGroupRange{startKeyGroup=32, endKeyGroup=63}. Unless you're directly using > low level state access APIs, this is most likely caused by non-deterministic > shuffle key (hashCode and equals implementation). > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) > at > org.apache.flink.runtime.state.heap.StateTable.getMapForKeyGroup(StateTable.java:305) > at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:261) > at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72) > at > com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:169) > at > com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:160) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:526) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:811) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:760) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) > at java.lang.Thread.run(Thread.java:750) > {code} > However, the RocksDBStateBackend will run without an exception. The wrong key > group index will cause a state correctness problem, so it is better to do a > check in {_}InternalKeyContextImpl#{_}{_}setCurrentKeyGroupIndex{_}, and > throw an exception immediately. > -- This message was sent by Atlassian Jira (v8.20.10#820010)