Hi, Which version of Flink are you using? This issue should have been resolved at least by 1.3.0: https://issues.apache.org/jira/browse/FLINK-5874 <https://issues.apache.org/jira/browse/FLINK-5874>. Currently such keys should be rejected. There is also this issue, that aims to re-introduce proper support for arrays as keys: https://issues.apache.org/jira/browse/FLINK-5299 <https://issues.apache.org/jira/browse/FLINK-5299>
Best, Aljoscha > On 31. Jul 2017, at 15:16, Xu Pingyong <xupingyong...@163.com> wrote: > > Hi Aljoschaļ¼ > > The java.lang.Array hashCode depends on the reference instead of the > content. If the keyBy field contains an array, Two records are > hash-partitioned to different stream although their keys are equal. > > int a1[] = new int[]{1, 2}; // hashcode is : 5592464 > int a2[] = new int[]{1, 2}; // hashcode is 1830712962 > > > streaming job example: > > > Tuple2<byte[], Integer>[] soures = new Tuple2[]{new Tuple2("a".getBytes(), > 2), new Tuple2("a".getBytes(), 5)}; > > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromElements(soures) > .keyBy(0) > .sum(1) > .map(new MapFunction<Tuple2<byte[], Integer>, Tuple2<String, Integer>>() { > @Override > public Tuple2<String, Integer> map(Tuple2<byte[], Integer> value) throws > Exception { > return new Tuple2<>(new String(value.f0), value.f1); > } > }).print(); > > > env.execute(); > > > Expected result is: (a, 7), not the actual result. What do you think > about this case? > > > Best Regards! > Xu Pingyong > > >