@Steffen Yes, you can currently not use arrays as keys. There is a check missing that gives you a proper error message for that.
The double[] is hashed on the sender side before sending it. Java's hash over an array does not take its contents into account, but the array's memory address, which makes it a non-deterministic hash. When the double is re-hashed on the receiver, you get a different hash, which is detected as violating the key groups. In fact, your program was probably behaving wrong before, but now you get a message for the error... On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi, > > if you key is a double[], even if the field is a final double[], it is > mutable because the array entries can be mutated and maybe that is what > happened? You can check if the following two points are in sync, hash-wise: > KeyGroupStreamPartitioner::selectChannels and > AbstractKeyedStateBackend::setCurrentKey. > The first method basically determines to which parallel operator a tuple is > routed in a keyed stream. The second is determining the tuple’s key group > for the backend. Both must be in sync w.r.t. their result of the key-group > that is determined for the tuple. And this assignment is done based on the > hash of the key. Therefore, the hash of the tuple’s key should never change > and must be immutable. If you can notice a change in hash code, that change > is what breaks your code. I am pretty sure that Flink 1.1.x might just > silently accept a mutation of the key, but actually this is arguably > incorrect. > > Best, > Stefan > > > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann < > stef...@hausmann-family.de>: > > > > Thanks for these pointers, Stefan. > > > > I've started a fresh job and didn't migrate any state from previous > execution. Moreover, all the fields of all the events I'm using are > declared final. > > > > I've set a breakpoint to figure out what event is causing the problem, > and it turns out that Flink starts processing the incoming events for some > time and only when a certain window triggers an exception is thrown. The > specific code that causes the exception is as follows: > > > >> DataStream<IdleDuration> idleDuration = cleanedTrips > >> .keyBy("license") > >> .flatMap(new DetermineIdleDuration()) > >> .filter(duration -> duration.avg_idle_duration >= 0 && > duration.avg_idle_duration <= 240) > >> .keyBy("location") > >> .timeWindow(Time.minutes(10)) > >> .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> > input, Collector<IdleDuration> out) -> { > >> double[] location = Iterables.get(input, 0).location; > >> double avgDuration = StreamSupport > >> .stream(input.spliterator(), false) > >> .mapToDouble(idle -> idle.avg_idle_duration) > >> .average() > >> .getAsDouble(); > >> > >> out.collect(new IdleDuration(location, avgDuration, > window.maxTimestamp())); > >> }); > > > > If the apply statement is removed, there is no exception during runtime. > > > > The location field that is referenced by the keyBy statement is actually > a double[]. May this cause the problems I'm experiencing? > > > > You can find some more code for additional context in the attached > document. > > > > Thanks for looking into this! > > > > Steffen > > > > > > > > On 20/02/2017 15:22, Stefan Richter wrote: > >> Hi, > >> > >> Flink 1.2 is partitioning all keys into key-groups, the atomic units > for rescaling. This partitioning is done by hash partitioning and is also > in sync with the routing of tuples to operator instances (each parallel > instance of a keyed operator is responsible for some range of key groups). > This exception means that Flink detected a tuple in the state backend of a > parallel operator instance that should not be there because, by its key > hash, it belongs to a different key-group. Or phrased differently, this > tuple belongs to a different parallel operator instance. If this is a Flink > bug or user code bug is very hard to tell, the log also does not provide > additional insights. I could see this happen in case that your keys are > mutable and your code makes some changes to the object that change the hash > code. Another question is also: did you migrate your job from Flink 1.1.3 > through an old savepoint or did you do a fresh start. Other than that, I > can recommend to check your code for mutating of keys. If this fails > deterministically, you could also try to set a breakpoint for the line of > the exception and take a look if the key that is about to be inserted is > somehow special. > >> > >> Best, > >> Stefan > >> > >> > >>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann < > stef...@hausmann-family.de>: > >>> > >>> Hi there, > >>> > >>> I’m having problems running a job on Flink 1.2.0 that successfully > executes on Flink 1.1.3. The job is supposed to read events from a Kinesis > stream and to send outputs to Elasticsearch and it actually initiates > successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I > start to ingest events into the Kinesis stream, the job fails (see the > attachment for more information): > >>> > >>> java.lang.RuntimeException: Unexpected key group index. This indicates > a bug. > >>> > >>> at org.apache.flink.runtime.state.heap.StateTable.set( > StateTable.java:57) > >>> > >>> at org.apache.flink.runtime.state.heap.HeapListState.add( > HeapListState.java:98) > >>> > >>> at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:372) > >>> > >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor. > processInput(StreamInputProcessor.java:185) > >>> > >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask.java:63) > >>> > >>> at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:272) > >>> > >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > >>> > >>> at java.lang.Thread.run(Thread.java:745) > >>> > >>> Any ideas what’s going wrong here? The job executes successfully when > it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 > cluster. Does this indicate a bug in my code or is this rather a bug in > Flink? How can I further debug this? > >>> > >>> Any guidance is highly appreciated. > >>> > >>> Thanks, > >>> > >>> Steffen > >>> > >>> <log> > >> > > <snipplet.java> > >