I misuse the key selector. I checked the doc and found it must return deterministic key, so using random is wrong, but I still could not understand why it would cause oom.
2017-12-28 21:57 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: > It's very strange, when I change the key selector to use random key, > the jvm reports oom. > > .keyBy(new KeySelector<MyEvent, Integer>() { > public Integer getKey(MyEvent ev) { return > ThreadLocalRandom.current().nextInt(1, 100);} > }) > > Caused by: java.lang.OutOfMemoryError: Java heap space > at > com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) > at > com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) > at > com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) > at > com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > > Could anybody explain the internal of keyby()? > > 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>: >> Hey Jinhua, >> >> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com> wrote: >>> The keyby() upon the field would generate unique key as the field >>> value, so if the number of the uniqueness is huge, flink would have >>> trouble both on cpu and memory. Is it considered in the design of >>> flink? >> >> Yes, keyBy hash partitions the data across the nodes of your Flink >> application and thus you can easily scale your application up if you >> need more processing power. >> >> I'm not sure that this is the problem in your case though. Can you >> provide some more details what you are doing exactly? Are you >> aggregating by time (for the keyBy you mention no windowing, but then >> you mention windowAll)? What kind of aggregation are you doing? If >> possible, feel free to share some code. >> >>> Since windowsAll() could be set parallelism, so I try to use key >>> selector to use field hash but not value, that I hope it would >>> decrease the number of the keys, but the flink throws key out-of-range >>> exception. How to use key selector in correct way? >> >> Can you paste the exact Exception you use? I think this might indicate >> that you don't correctly extract the key from your record, e.g. you >> extract a different key on sender and receiver. >> >> I'm sure we can figure this out after you provide more context. :-) >> >> – Ufuk