Hi Aljoscha: Thanks for your reply. Look forward to the final solution.
Best Regards Xu Pingyong 在 2017-07-31 22:39:38,"Aljoscha Krettek" <aljos...@apache.org> 写道: >Hi, > >Which version of Flink are you using? This issue should have been resolved at >least by 1.3.0: https://issues.apache.org/jira/browse/FLINK-5874 ><https://issues.apache.org/jira/browse/FLINK-5874>. Currently such keys should >be rejected. There is also this issue, that aims to re-introduce proper >support for arrays as keys: https://issues.apache.org/jira/browse/FLINK-5299 ><https://issues.apache.org/jira/browse/FLINK-5299> > >Best, >Aljoscha > >> On 31. Jul 2017, at 15:16, Xu Pingyong <xupingyong...@163.com> wrote: >> >> Hi Aljoscha: >> >> The java.lang.Array hashCode depends on the reference instead of the >> content. If the keyBy field contains an array, Two records are >> hash-partitioned to different stream although their keys are equal. >> >> int a1[] = new int[]{1, 2}; // hashcode is : 5592464 >> int a2[] = new int[]{1, 2}; // hashcode is 1830712962 >> >> >> streaming job example: >> >> >> Tuple2<byte[], Integer>[] soures = new Tuple2[]{new Tuple2("a".getBytes(), >> 2), new Tuple2("a".getBytes(), 5)}; >> >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.fromElements(soures) >> .keyBy(0) >> .sum(1) >> .map(new MapFunction<Tuple2<byte[], Integer>, Tuple2<String, Integer>>() { >> @Override >> public Tuple2<String, Integer> map(Tuple2<byte[], Integer> value) throws >> Exception { >> return new Tuple2<>(new String(value.f0), value.f1); >> } >> }).print(); >> >> >> env.execute(); >> >> >> Expected result is: (a, 7), not the actual result. What do you think >> about this case? >> >> >> Best Regards! >> Xu Pingyong >> >> >> >