It's very strange, when I change the key selector to use random key,
the jvm reports oom.

   .keyBy(new KeySelector<MyEvent, Integer>() {
     public Integer getKey(MyEvent ev) { return
ThreadLocalRandom.current().nextInt(1, 100);}
   })

Caused by: java.lang.OutOfMemoryError: Java heap space
        at 
com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469)
        at com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230)
        at com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

Could anybody explain the internal of keyby()?

2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>:
> Hey Jinhua,
>
> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com> wrote:
>> The keyby() upon the field would generate unique key as the field
>> value, so if the number of the uniqueness is huge, flink would have
>> trouble both on cpu and memory. Is it considered in the design of
>> flink?
>
> Yes, keyBy hash partitions the data across the nodes of your Flink
> application and thus you can easily scale your application up if you
> need more processing power.
>
> I'm not sure that this is the problem in your case though. Can you
> provide some more details what you are doing exactly? Are you
> aggregating by time (for the keyBy you mention no windowing, but then
> you mention windowAll)? What kind of aggregation are you doing? If
> possible, feel free to share some code.
>
>> Since windowsAll() could be set parallelism, so I try to use key
>> selector to use field hash but not value, that I hope it would
>> decrease the number of the keys, but the flink throws key out-of-range
>> exception. How to use key selector in correct way?
>
> Can you paste the exact Exception you use? I think this might indicate
> that you don't correctly extract the key from your record, e.g. you
> extract a different key on sender and receiver.
>
> I'm sure we can figure this out after you provide more context. :-)
>
> – Ufuk

Reply via email to