Hi, I am using apache flink 1.18 and running this locally with rocksdb as state backend. So far my pipeline was working fine and was making few adjustments and then it started failing with some weird exception:
2024-11-24 21:54:27,440 WARN org.apache.flink.runtime.taskmanager.Task [] - Task (1/4)#0 (8723e9429489c210622aa760b8ac89bd_07d155ff1a31623908c300b9bacd8bce_0_0) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: Exception occurred while setting the current key context. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.1.jar:1.18.1] at java.lang.Thread.run(Thread.java:832) [?:?] Caused by: java.lang.IllegalArgumentException: Key group 76 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=31}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation). at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371) ~[flink-dist-1.18.1.jar:1.18.1] ... 18 more The pipeline is running with parallelism 4 and the code where it fails is something like this outputData = inputData .union(inputData1, inputData2) .keyBy( e -> MurmurHash3.hash128x64((e.field1 + e.field2 + e.field3).getBytes())[0]) .process(new UnionKeyedProcessFunction()) .name("Task"); Any idea where I should look into as this code was working before. Thanks Sachin