Re: keyby() issue

2018-01-04 Thread Jinhua Luo
I mean the timeout should likely happens in the sending queue of the redis lib if the concurrency number is low. -org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(StreamRecord) public void processElement(StreamRecord element) throws Exception { final S

Re: keyby() issue

2018-01-04 Thread Jinhua Luo
2018-01-04 21:04 GMT+08:00 Aljoscha Krettek : > Memory usage should grow linearly with the number of windows you have active > at any given time, the number of keys and the number of different window > operations you have. But the memory usage is still too much, especially when the incremental a

Re: keyby() issue

2018-01-04 Thread Aljoscha Krettek
Memory usage should grow linearly with the number of windows you have active at any given time, the number of keys and the number of different window operations you have. Regarding the async I/O writing to redis, I see that you give a capacity of 1 which means that there will possibly be 10

Re: keyby() issue

2018-01-04 Thread Jinhua Luo
The app is very simple, please see the code snippet: https://gist.github.com/kingluo/e06381d930f34600e42b050fef6baedd I rerun the app, but it's weird that it can continuously produce the results now. But it have two new issues: a) memory usage too high, it uses about 8 GB heap memory! why? Beca

Re: keyby() issue

2018-01-03 Thread Aljoscha Krettek
Side note: Sliding windows can be quite expensive if the slide is small compared to the size. Flink will treat each "slide" as a separate window, so in your case you will get 60 * num_keys windows, which can become quite big. Best, Aljoscha > On 2. Jan 2018, at 17:41, Timo Walther wrote: > >

Re: keyby() issue

2018-01-02 Thread Timo Walther
Hi Jinhua, did you check the key group assignments? What is the distribution of "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? This also depends on the hashCode on the output of your KeySelector. keyBy should handle high traffic well, but it is designed for key spa

Re: keyby() issue

2017-12-31 Thread Jinhua Luo
I checked the logs, but no information indicates what happens. In fact, in the same app, there is another stream, but its kafka source is low traffic, and I aggregate some field of that source too, and flink gives correct results continuously. So I doubt if keyby() could not handle high traffic we

Re: keyby() issue

2017-12-31 Thread Steven Wu
> but soon later, no results produced, and flink seems busy doing something forever. Jinhua, don't know if you have checked these things. if not, maybe worth a look. have you tried to do a thread dump? How is the GC pause? do you see flink restart? check the exception tab in Flink web UI for you

Re: keyby() issue

2017-12-31 Thread Jinhua Luo
I take time to read some source codes about the keyed stream windowing, and I make below understanding: a) the keyed stream would be split and dispatched to downstream tasks in hash manner, and the hash base is the parallelism of the downstream operator: See org.apache.flink.runtime.state.KeyGro

Re: keyby() issue

2017-12-29 Thread Jinhua Luo
I misuse the key selector. I checked the doc and found it must return deterministic key, so using random is wrong, but I still could not understand why it would cause oom. 2017-12-28 21:57 GMT+08:00 Jinhua Luo : > It's very strange, when I change the key selector to use random key, > the jvm rep

Re: keyby() issue

2017-12-28 Thread Jinhua Luo
It's very strange, when I change the key selector to use random key, the jvm reports oom. .keyBy(new KeySelector() { public Integer getKey(MyEvent ev) { return ThreadLocalRandom.current().nextInt(1, 100);} }) Caused by: java.lang.OutOfMemoryError: Java heap space at com.esoter

Re: keyby() issue

2017-12-28 Thread Jinhua Luo
Does keyby() on field generate the same number of key as the number of uniqueness of the field? For example, if the field is valued in range {"a", "b", "c"}, then the number of keys is 3, correct? The field in my case has half of million uniqueness (ip addresses), so keyby() on field following with

Re: keyby() issue

2017-12-28 Thread Ufuk Celebi
Hey Jinhua, On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo wrote: > The keyby() upon the field would generate unique key as the field > value, so if the number of the uniqueness is huge, flink would have > trouble both on cpu and memory. Is it considered in the design of > flink? Yes, keyBy hash pa

keyby() issue

2017-12-28 Thread Jinhua Luo
Hi All, I need to aggregate some field of the event, at first I use keyby(), but I found the flink performs very slow (even stop working out results) due to the number of keys is around half a million per min. So I use windowAll() instead, and flink works as expected then. The keyby() upon the fi