Hi,

Flink 1.2 is partitioning all keys into key-groups, the atomic units for 
rescaling. This partitioning is done by hash partitioning and is also in sync 
with the routing of tuples to operator instances (each parallel instance of a 
keyed operator is responsible for some range of key groups). This exception 
means that Flink detected a tuple in the state backend of a parallel operator 
instance that should not be there because, by its key hash, it belongs to a 
different key-group. Or phrased differently, this tuple belongs to a different 
parallel operator instance. If this is a Flink bug or user code bug is very 
hard to tell, the log also does not provide additional insights. I could see 
this happen in case that your keys are mutable and your code makes some changes 
to the object that change the hash code. Another question is also: did you 
migrate your job from Flink 1.1.3 through an old savepoint or did you do a 
fresh start. Other than that, I can recommend to check your code for mutating 
of keys. If this fails deterministically, you could also try to set a 
breakpoint for the line of the exception and take a look if the key that is 
about to be inserted is somehow special.

Best,
Stefan 


> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <stef...@hausmann-family.de>:
> 
> Hi there,
> 
> I’m having problems running a job on Flink 1.2.0 that successfully executes 
> on Flink 1.1.3. The job is supposed to read events from a Kinesis stream and 
> to send outputs to Elasticsearch and it actually initiates successfully on a 
> Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest events 
> into the Kinesis stream, the job fails (see the attachment for more 
> information):
> 
> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
> 
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
> 
> at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
> 
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
> 
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
> 
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> 
> at java.lang.Thread.run(Thread.java:745)
> 
> Any ideas what’s going wrong here? The job executes successfully when it’s 
> compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. 
> Does this indicate a bug in my code or is this rather a bug in Flink? How can 
> I further debug this?
> 
> Any guidance is highly appreciated.
> 
> Thanks,
> 
> Steffen
> 
> <log>

Reply via email to