Hi,
I am using apache flink 1.18 and running this locally with rocksdb as state
backend.
So far my pipeline was working fine and was making few adjustments and then
it started failing with some weird exception:

2024-11-24 21:54:27,440 WARN org.apache.flink.runtime.taskmanager.Task [] -
Task (1/4)#0
(8723e9429489c210622aa760b8ac89bd_07d155ff1a31623908c300b9bacd8bce_0_0)
switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: Exception occurred while setting the current
key context.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
[flink-dist-1.18.1.jar:1.18.1]
at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.IllegalArgumentException: Key group 76 is not in
KeyGroupRange{startKeyGroup=0, endKeyGroup=31}. Unless you're directly
using low level state access APIs, this is most likely caused by
non-deterministic shuffle key (hashCode and equals implementation).
at
org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
~[flink-dist-1.18.1.jar:1.18.1]
... 18 more

The pipeline is running with parallelism 4 and the code where it fails is
something like this

outputData =
    inputData
        .union(inputData1, inputData2)
        .keyBy(
            e -> MurmurHash3.hash128x64((e.field1 + e.field2 +
e.field3).getBytes())[0])
        .process(new UnionKeyedProcessFunction())
        .name("Task");


Any idea where I should look into as this code was working before.

Thanks
Sachin

Reply via email to