The app is very simple, please see the code snippet:

https://gist.github.com/kingluo/e06381d930f34600e42b050fef6baedd


I rerun the app, but it's weird that it can continuously produce the
results now.

But it have two new issues:
a) memory usage too high, it uses about 8 GB heap memory! why? Because
the traffic is too high?
b) the redis async io is likely to be timedout and fails the whole pipeline.



2018-01-03 0:41 GMT+08:00 Timo Walther <twal...@apache.org>:
> Hi Jinhua,
>
> did you check the key group assignments? What is the distribution of
> "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data?
> This also depends on the hashCode on the output of your KeySelector.
>
> keyBy should handle high traffic well, but it is designed for key spaces
> with thousands or millions of values. If this is not the case, you need to
> introduce some more artifical key to spread the load more evenly.
>
> Regarding your OutOfMemoryError: I think you producing elements much faster
> than the following operators after keyBy process/discard the elements. Can
> you explain us your job in more detail? Are you using event-time? How do you
> aggregate elements of the windows?
>
> Regards,
> Timo
>
>
>
> Am 1/1/18 um 6:00 AM schrieb Jinhua Luo:
>
>> I checked the logs, but no information indicates what happens.
>>
>> In fact, in the same app, there is another stream, but its kafka
>> source is low traffic, and I aggregate some field of that source too,
>> and flink gives correct results continuously.
>> So I doubt if keyby() could not handle high traffic well (which
>> affects the number of keys in the key partitions).
>>
>> 2018-01-01 2:04 GMT+08:00 Steven Wu <stevenz...@gmail.com>:
>>>>
>>>>   but soon later, no results produced, and flink seems busy doing
>>>> something
>>>> forever.
>>>
>>> Jinhua, don't know if you have checked these things. if not, maybe worth
>>> a
>>> look.
>>>
>>> have you tried to do a thread dump?
>>> How is the GC pause?
>>> do you see flink restart? check the exception tab in Flink web UI for
>>> your
>>> job.
>>>
>>>
>>>
>>> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <luajit...@gmail.com> wrote:
>>>>
>>>> I take time to read some source codes about the keyed stream
>>>> windowing, and I make below understanding:
>>>>
>>>> a) the keyed stream would be split and dispatched to downstream tasks
>>>> in hash manner, and the hash base is the parallelism of the downstream
>>>> operator:
>>>>
>>>> See
>>>>
>>>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int,
>>>> int):
>>>> MathUtils.murmurHash(keyHash) % maxParallelism;
>>>>
>>>> That's what the doc said "hash partitioning".
>>>>
>>>> So the compiled execution graph already determines whose operator
>>>> instance receive which key groups.
>>>>
>>>> b) with windowing, the key is used to index window states, so the
>>>> window function would receive the deserialized value from its
>>>> corresponding window state of some key.
>>>>
>>>> b.1) The element would be added into the state first:
>>>>
>>>> See
>>>>
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>):
>>>> windowState.add(element.getValue());
>>>>
>>>> b.2) when the trigger fires the window, the value would be
>>>> deserialized from the keyed state:
>>>>
>>>> ACC contents = windowState.get();
>>>> emitWindowContents(actualWindow, contents);
>>>>
>>>> For rocksdb backend, each input element would be taken back and forth
>>>> from the disk in the processing.
>>>>
>>>> flink's keyed stream has the same functionality as storm's field
>>>> grouping, and more complicated.
>>>>
>>>> Am I correct?
>>>>
>>>>
>>>> But I still could not understand why keyby() stops flink from
>>>> returning expected results.
>>>>
>>>> Let me explain my case more:
>>>> I use kafka data source, which collects log lines of log files from
>>>> tens of machines.
>>>> The log line is in json format, which contains the "ip" field, the ip
>>>> address of the user, so it could be valued in million of ip addresses
>>>> of the Internet.
>>>> The stream processing is expected to result in ip aggregation in {1
>>>> hour, 1 min} sliding window.
>>>>
>>>> If I use keyBy("ip"), then at first minutes, the flink could give me
>>>> correct aggregation results, but soon later, no results produced, and
>>>> flink seems busy doing something forever.
>>>>
>>>> I doubt if keyby() could handle huge keys like this case, and when I
>>>> remove keyby().window().fold() and use windowAll().fold() instead (the
>>>> latter fold operator uses hashmap to aggregate ip by itself), flink
>>>> works. But as known, the windowAll() is not scale-able.
>>>>
>>>> Could flink developers help me on this topic, I prefer flink and I
>>>> believe flink is one of best stream processing frameworks, but I am
>>>> really frustrated that flink could be fulfill its feature just like
>>>> the doc said.
>>>>
>>>> Thank you all.
>>>>
>>>>
>>>> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <luajit...@gmail.com>:
>>>>>
>>>>> I misuse the key selector. I checked the doc and found it must return
>>>>> deterministic key, so using random is wrong, but I still could not
>>>>> understand why it would cause oom.
>>>>>
>>>>>
>>>>>
>>>>> 2017-12-28 21:57 GMT+08:00 Jinhua Luo <luajit...@gmail.com>:
>>>>>>
>>>>>> It's very strange, when I change the key selector to use random key,
>>>>>> the jvm reports oom.
>>>>>>
>>>>>>     .keyBy(new KeySelector<MyEvent, Integer>() {
>>>>>>       public Integer getKey(MyEvent ev) { return
>>>>>> ThreadLocalRandom.current().nextInt(1, 100);}
>>>>>>     })
>>>>>>
>>>>>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>>>>>          at
>>>>>>
>>>>>> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469)
>>>>>>          at
>>>>>> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230)
>>>>>>          at
>>>>>> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144)
>>>>>>          at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
>>>>>>          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>>>>>          at
>>>>>>
>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
>>>>>>          at
>>>>>>
>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>>>>>>          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>>>>>>          at
>>>>>>
>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>
>>>>>> Could anybody explain the internal of keyby()?
>>>>>>
>>>>>> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>:
>>>>>>>
>>>>>>> Hey Jinhua,
>>>>>>>
>>>>>>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> The keyby() upon the field would generate unique key as the field
>>>>>>>> value, so if the number of the uniqueness is huge, flink would have
>>>>>>>> trouble both on cpu and memory. Is it considered in the design of
>>>>>>>> flink?
>>>>>>>
>>>>>>> Yes, keyBy hash partitions the data across the nodes of your Flink
>>>>>>> application and thus you can easily scale your application up if you
>>>>>>> need more processing power.
>>>>>>>
>>>>>>> I'm not sure that this is the problem in your case though. Can you
>>>>>>> provide some more details what you are doing exactly? Are you
>>>>>>> aggregating by time (for the keyBy you mention no windowing, but then
>>>>>>> you mention windowAll)? What kind of aggregation are you doing? If
>>>>>>> possible, feel free to share some code.
>>>>>>>
>>>>>>>> Since windowsAll() could be set parallelism, so I try to use key
>>>>>>>> selector to use field hash but not value, that I hope it would
>>>>>>>> decrease the number of the keys, but the flink throws key
>>>>>>>> out-of-range
>>>>>>>> exception. How to use key selector in correct way?
>>>>>>>
>>>>>>> Can you paste the exact Exception you use? I think this might
>>>>>>> indicate
>>>>>>> that you don't correctly extract the key from your record, e.g. you
>>>>>>> extract a different key on sender and receiver.
>>>>>>>
>>>>>>> I'm sure we can figure this out after you provide more context. :-)
>>>>>>>
>>>>>>> – Ufuk
>>>
>>>
>

Reply via email to