Hi, Flink 1.2 is partitioning all keys into key-groups, the atomic units for rescaling. This partitioning is done by hash partitioning and is also in sync with the routing of tuples to operator instances (each parallel instance of a keyed operator is responsible for some range of key groups). This exception means that Flink detected a tuple in the state backend of a parallel operator instance that should not be there because, by its key hash, it belongs to a different key-group. Or phrased differently, this tuple belongs to a different parallel operator instance. If this is a Flink bug or user code bug is very hard to tell, the log also does not provide additional insights. I could see this happen in case that your keys are mutable and your code makes some changes to the object that change the hash code. Another question is also: did you migrate your job from Flink 1.1.3 through an old savepoint or did you do a fresh start. Other than that, I can recommend to check your code for mutating of keys. If this fails deterministically, you could also try to set a breakpoint for the line of the exception and take a look if the key that is about to be inserted is somehow special.
Best, Stefan > Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <stef...@hausmann-family.de>: > > Hi there, > > I’m having problems running a job on Flink 1.2.0 that successfully executes > on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and > to send outputs to Elasticsearch and it actually initiates successfully on a > Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events > into the Kinesis stream, the job fails (see the attachment for more > information): > > java.lang.RuntimeException: Unexpected key group index. This indicates a bug. > > at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57) > > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98) > > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372) > > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) > > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > > at java.lang.Thread.run(Thread.java:745) > > Any ideas what’s going wrong here? The job executes successfully when it’s > compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. > Does this indicate a bug in my code or is this rather a bug in Flink? How can > I further debug this? > > Any guidance is highly appreciated. > > Thanks, > > Steffen > > <log>