The app is very simple, please see the code snippet: https://gist.github.com/kingluo/e06381d930f34600e42b050fef6baedd
I rerun the app, but it's weird that it can continuously produce the results now. But it have two new issues: a) memory usage too high, it uses about 8 GB heap memory! why? Because the traffic is too high? b) the redis async io is likely to be timedout and fails the whole pipeline. 2018-01-03 0:41 GMT+08:00 Timo Walther <twal...@apache.org>: > Hi Jinhua, > > did you check the key group assignments? What is the distribution of > "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? > This also depends on the hashCode on the output of your KeySelector. > > keyBy should handle high traffic well, but it is designed for key spaces > with thousands or millions of values. If this is not the case, you need to > introduce some more artifical key to spread the load more evenly. > > Regarding your OutOfMemoryError: I think you producing elements much faster > than the following operators after keyBy process/discard the elements. Can > you explain us your job in more detail? Are you using event-time? How do you > aggregate elements of the windows? > > Regards, > Timo > > > > Am 1/1/18 um 6:00 AM schrieb Jinhua Luo: > >> I checked the logs, but no information indicates what happens. >> >> In fact, in the same app, there is another stream, but its kafka >> source is low traffic, and I aggregate some field of that source too, >> and flink gives correct results continuously. >> So I doubt if keyby() could not handle high traffic well (which >> affects the number of keys in the key partitions). >> >> 2018-01-01 2:04 GMT+08:00 Steven Wu <stevenz...@gmail.com>: >>>> >>>> but soon later, no results produced, and flink seems busy doing >>>> something >>>> forever. >>> >>> Jinhua, don't know if you have checked these things. if not, maybe worth >>> a >>> look. >>> >>> have you tried to do a thread dump? >>> How is the GC pause? >>> do you see flink restart? check the exception tab in Flink web UI for >>> your >>> job. >>> >>> >>> >>> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <luajit...@gmail.com> wrote: >>>> >>>> I take time to read some source codes about the keyed stream >>>> windowing, and I make below understanding: >>>> >>>> a) the keyed stream would be split and dispatched to downstream tasks >>>> in hash manner, and the hash base is the parallelism of the downstream >>>> operator: >>>> >>>> See >>>> >>>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, >>>> int): >>>> MathUtils.murmurHash(keyHash) % maxParallelism; >>>> >>>> That's what the doc said "hash partitioning". >>>> >>>> So the compiled execution graph already determines whose operator >>>> instance receive which key groups. >>>> >>>> b) with windowing, the key is used to index window states, so the >>>> window function would receive the deserialized value from its >>>> corresponding window state of some key. >>>> >>>> b.1) The element would be added into the state first: >>>> >>>> See >>>> >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): >>>> windowState.add(element.getValue()); >>>> >>>> b.2) when the trigger fires the window, the value would be >>>> deserialized from the keyed state: >>>> >>>> ACC contents = windowState.get(); >>>> emitWindowContents(actualWindow, contents); >>>> >>>> For rocksdb backend, each input element would be taken back and forth >>>> from the disk in the processing. >>>> >>>> flink's keyed stream has the same functionality as storm's field >>>> grouping, and more complicated. >>>> >>>> Am I correct? >>>> >>>> >>>> But I still could not understand why keyby() stops flink from >>>> returning expected results. >>>> >>>> Let me explain my case more: >>>> I use kafka data source, which collects log lines of log files from >>>> tens of machines. >>>> The log line is in json format, which contains the "ip" field, the ip >>>> address of the user, so it could be valued in million of ip addresses >>>> of the Internet. >>>> The stream processing is expected to result in ip aggregation in {1 >>>> hour, 1 min} sliding window. >>>> >>>> If I use keyBy("ip"), then at first minutes, the flink could give me >>>> correct aggregation results, but soon later, no results produced, and >>>> flink seems busy doing something forever. >>>> >>>> I doubt if keyby() could handle huge keys like this case, and when I >>>> remove keyby().window().fold() and use windowAll().fold() instead (the >>>> latter fold operator uses hashmap to aggregate ip by itself), flink >>>> works. But as known, the windowAll() is not scale-able. >>>> >>>> Could flink developers help me on this topic, I prefer flink and I >>>> believe flink is one of best stream processing frameworks, but I am >>>> really frustrated that flink could be fulfill its feature just like >>>> the doc said. >>>> >>>> Thank you all. >>>> >>>> >>>> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: >>>>> >>>>> I misuse the key selector. I checked the doc and found it must return >>>>> deterministic key, so using random is wrong, but I still could not >>>>> understand why it would cause oom. >>>>> >>>>> >>>>> >>>>> 2017-12-28 21:57 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: >>>>>> >>>>>> It's very strange, when I change the key selector to use random key, >>>>>> the jvm reports oom. >>>>>> >>>>>> .keyBy(new KeySelector<MyEvent, Integer>() { >>>>>> public Integer getKey(MyEvent ev) { return >>>>>> ThreadLocalRandom.current().nextInt(1, 100);} >>>>>> }) >>>>>> >>>>>> Caused by: java.lang.OutOfMemoryError: Java heap space >>>>>> at >>>>>> >>>>>> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >>>>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >>>>>> at >>>>>> >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >>>>>> at >>>>>> >>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>> at >>>>>> >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >>>>>> at >>>>>> >>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >>>>>> at >>>>>> >>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>>>> >>>>>> Could anybody explain the internal of keyby()? >>>>>> >>>>>> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>: >>>>>>> >>>>>>> Hey Jinhua, >>>>>>> >>>>>>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> The keyby() upon the field would generate unique key as the field >>>>>>>> value, so if the number of the uniqueness is huge, flink would have >>>>>>>> trouble both on cpu and memory. Is it considered in the design of >>>>>>>> flink? >>>>>>> >>>>>>> Yes, keyBy hash partitions the data across the nodes of your Flink >>>>>>> application and thus you can easily scale your application up if you >>>>>>> need more processing power. >>>>>>> >>>>>>> I'm not sure that this is the problem in your case though. Can you >>>>>>> provide some more details what you are doing exactly? Are you >>>>>>> aggregating by time (for the keyBy you mention no windowing, but then >>>>>>> you mention windowAll)? What kind of aggregation are you doing? If >>>>>>> possible, feel free to share some code. >>>>>>> >>>>>>>> Since windowsAll() could be set parallelism, so I try to use key >>>>>>>> selector to use field hash but not value, that I hope it would >>>>>>>> decrease the number of the keys, but the flink throws key >>>>>>>> out-of-range >>>>>>>> exception. How to use key selector in correct way? >>>>>>> >>>>>>> Can you paste the exact Exception you use? I think this might >>>>>>> indicate >>>>>>> that you don't correctly extract the key from your record, e.g. you >>>>>>> extract a different key on sender and receiver. >>>>>>> >>>>>>> I'm sure we can figure this out after you provide more context. :-) >>>>>>> >>>>>>> – Ufuk >>> >>> >