It's very strange, when I change the key selector to use random key, the jvm reports oom.
.keyBy(new KeySelector<MyEvent, Integer>() { public Integer getKey(MyEvent ev) { return ThreadLocalRandom.current().nextInt(1, 100);} }) Caused by: java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) at com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) at com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) Could anybody explain the internal of keyby()? 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>: > Hey Jinhua, > > On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com> wrote: >> The keyby() upon the field would generate unique key as the field >> value, so if the number of the uniqueness is huge, flink would have >> trouble both on cpu and memory. Is it considered in the design of >> flink? > > Yes, keyBy hash partitions the data across the nodes of your Flink > application and thus you can easily scale your application up if you > need more processing power. > > I'm not sure that this is the problem in your case though. Can you > provide some more details what you are doing exactly? Are you > aggregating by time (for the keyBy you mention no windowing, but then > you mention windowAll)? What kind of aggregation are you doing? If > possible, feel free to share some code. > >> Since windowsAll() could be set parallelism, so I try to use key >> selector to use field hash but not value, that I hope it would >> decrease the number of the keys, but the flink throws key out-of-range >> exception. How to use key selector in correct way? > > Can you paste the exact Exception you use? I think this might indicate > that you don't correctly extract the key from your record, e.g. you > extract a different key on sender and receiver. > > I'm sure we can figure this out after you provide more context. :-) > > – Ufuk