I take time to read some source codes about the keyed stream
windowing, and I make below understanding:

a) the keyed stream would be split and dispatched to downstream tasks
in hash manner, and the hash base is the parallelism of the downstream
operator:

See 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int,
int):
MathUtils.murmurHash(keyHash) % maxParallelism;

That's what the doc said "hash partitioning".

So the compiled execution graph already determines whose operator
instance receive which key groups.

b) with windowing, the key is used to index window states, so the
window function would receive the deserialized value from its
corresponding window state of some key.

b.1) The element would be added into the state first:

See 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>):
windowState.add(element.getValue());

b.2) when the trigger fires the window, the value would be
deserialized from the keyed state:

ACC contents = windowState.get();
emitWindowContents(actualWindow, contents);

For rocksdb backend, each input element would be taken back and forth
from the disk in the processing.

flink's keyed stream has the same functionality as storm's field
grouping, and more complicated.

Am I correct?


But I still could not understand why keyby() stops flink from
returning expected results.

Let me explain my case more:
I use kafka data source, which collects log lines of log files from
tens of machines.
The log line is in json format, which contains the "ip" field, the ip
address of the user, so it could be valued in million of ip addresses
of the Internet.
The stream processing is expected to result in ip aggregation in {1
hour, 1 min} sliding window.

If I use keyBy("ip"), then at first minutes, the flink could give me
correct aggregation results, but soon later, no results produced, and
flink seems busy doing something forever.

I doubt if keyby() could handle huge keys like this case, and when I
remove keyby().window().fold() and use windowAll().fold() instead (the
latter fold operator uses hashmap to aggregate ip by itself), flink
works. But as known, the windowAll() is not scale-able.

Could flink developers help me on this topic, I prefer flink and I
believe flink is one of best stream processing frameworks, but I am
really frustrated that flink could be fulfill its feature just like
the doc said.

Thank you all.


2017-12-29 17:42 GMT+08:00 Jinhua Luo <luajit...@gmail.com>:
> I misuse the key selector. I checked the doc and found it must return
> deterministic key, so using random is wrong, but I still could not
> understand why it would cause oom.
>
>
>
> 2017-12-28 21:57 GMT+08:00 Jinhua Luo <luajit...@gmail.com>:
>> It's very strange, when I change the key selector to use random key,
>> the jvm reports oom.
>>
>>    .keyBy(new KeySelector<MyEvent, Integer>() {
>>      public Integer getKey(MyEvent ev) { return
>> ThreadLocalRandom.current().nextInt(1, 100);}
>>    })
>>
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>         at 
>> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469)
>>         at 
>> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230)
>>         at 
>> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>         at 
>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
>>         at 
>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>>         at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>>         at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>
>> Could anybody explain the internal of keyby()?
>>
>> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>:
>>> Hey Jinhua,
>>>
>>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com> wrote:
>>>> The keyby() upon the field would generate unique key as the field
>>>> value, so if the number of the uniqueness is huge, flink would have
>>>> trouble both on cpu and memory. Is it considered in the design of
>>>> flink?
>>>
>>> Yes, keyBy hash partitions the data across the nodes of your Flink
>>> application and thus you can easily scale your application up if you
>>> need more processing power.
>>>
>>> I'm not sure that this is the problem in your case though. Can you
>>> provide some more details what you are doing exactly? Are you
>>> aggregating by time (for the keyBy you mention no windowing, but then
>>> you mention windowAll)? What kind of aggregation are you doing? If
>>> possible, feel free to share some code.
>>>
>>>> Since windowsAll() could be set parallelism, so I try to use key
>>>> selector to use field hash but not value, that I hope it would
>>>> decrease the number of the keys, but the flink throws key out-of-range
>>>> exception. How to use key selector in correct way?
>>>
>>> Can you paste the exact Exception you use? I think this might indicate
>>> that you don't correctly extract the key from your record, e.g. you
>>> extract a different key on sender and receiver.
>>>
>>> I'm sure we can figure this out after you provide more context. :-)
>>>
>>> – Ufuk

Reply via email to