> but soon later, no results produced, and flink seems busy doing something forever.
Jinhua, don't know if you have checked these things. if not, maybe worth a look. have you tried to do a thread dump? How is the GC pause? do you see flink restart? check the exception tab in Flink web UI for your job. On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <luajit...@gmail.com> wrote: > I take time to read some source codes about the keyed stream > windowing, and I make below understanding: > > a) the keyed stream would be split and dispatched to downstream tasks > in hash manner, and the hash base is the parallelism of the downstream > operator: > > See org.apache.flink.runtime.state.KeyGroupRangeAssignment. > computeKeyGroupForKeyHash(int, > int): > MathUtils.murmurHash(keyHash) % maxParallelism; > > That's what the doc said "hash partitioning". > > So the compiled execution graph already determines whose operator > instance receive which key groups. > > b) with windowing, the key is used to index window states, so the > window function would receive the deserialized value from its > corresponding window state of some key. > > b.1) The element would be added into the state first: > > See org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(StreamRecord<IN>): > windowState.add(element.getValue()); > > b.2) when the trigger fires the window, the value would be > deserialized from the keyed state: > > ACC contents = windowState.get(); > emitWindowContents(actualWindow, contents); > > For rocksdb backend, each input element would be taken back and forth > from the disk in the processing. > > flink's keyed stream has the same functionality as storm's field > grouping, and more complicated. > > Am I correct? > > > But I still could not understand why keyby() stops flink from > returning expected results. > > Let me explain my case more: > I use kafka data source, which collects log lines of log files from > tens of machines. > The log line is in json format, which contains the "ip" field, the ip > address of the user, so it could be valued in million of ip addresses > of the Internet. > The stream processing is expected to result in ip aggregation in {1 > hour, 1 min} sliding window. > > If I use keyBy("ip"), then at first minutes, the flink could give me > correct aggregation results, but soon later, no results produced, and > flink seems busy doing something forever. > > I doubt if keyby() could handle huge keys like this case, and when I > remove keyby().window().fold() and use windowAll().fold() instead (the > latter fold operator uses hashmap to aggregate ip by itself), flink > works. But as known, the windowAll() is not scale-able. > > Could flink developers help me on this topic, I prefer flink and I > believe flink is one of best stream processing frameworks, but I am > really frustrated that flink could be fulfill its feature just like > the doc said. > > Thank you all. > > > 2017-12-29 17:42 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: > > I misuse the key selector. I checked the doc and found it must return > > deterministic key, so using random is wrong, but I still could not > > understand why it would cause oom. > > > > > > > > 2017-12-28 21:57 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: > >> It's very strange, when I change the key selector to use random key, > >> the jvm reports oom. > >> > >> .keyBy(new KeySelector<MyEvent, Integer>() { > >> public Integer getKey(MyEvent ev) { return > >> ThreadLocalRandom.current().nextInt(1, 100);} > >> }) > >> > >> Caused by: java.lang.OutOfMemoryError: Java heap space > >> at com.esotericsoftware.kryo.util.IdentityMap.resize( > IdentityMap.java:469) > >> at com.esotericsoftware.kryo.util.IdentityMap.push( > IdentityMap.java:230) > >> at com.esotericsoftware.kryo.util.IdentityMap.put( > IdentityMap.java:144) > >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) > >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > >> at com.esotericsoftware.kryo.serializers.MapSerializer. > copy(MapSerializer.java:157) > >> at com.esotericsoftware.kryo.serializers.MapSerializer. > copy(MapSerializer.java:21) > >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > >> at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.copy(KryoSerializer.java:175) > >> at org.apache.flink.api.java.typeutils.runtime. > PojoSerializer.copy(PojoSerializer.java:239) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:547) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:524) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:504) > >> at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 831) > >> at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: > 809) > >> at org.apache.flink.streaming.api.operators. > TimestampedCollector.collect(TimestampedCollector.java:51) > >> > >> Could anybody explain the internal of keyby()? > >> > >> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>: > >>> Hey Jinhua, > >>> > >>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com> > wrote: > >>>> The keyby() upon the field would generate unique key as the field > >>>> value, so if the number of the uniqueness is huge, flink would have > >>>> trouble both on cpu and memory. Is it considered in the design of > >>>> flink? > >>> > >>> Yes, keyBy hash partitions the data across the nodes of your Flink > >>> application and thus you can easily scale your application up if you > >>> need more processing power. > >>> > >>> I'm not sure that this is the problem in your case though. Can you > >>> provide some more details what you are doing exactly? Are you > >>> aggregating by time (for the keyBy you mention no windowing, but then > >>> you mention windowAll)? What kind of aggregation are you doing? If > >>> possible, feel free to share some code. > >>> > >>>> Since windowsAll() could be set parallelism, so I try to use key > >>>> selector to use field hash but not value, that I hope it would > >>>> decrease the number of the keys, but the flink throws key out-of-range > >>>> exception. How to use key selector in correct way? > >>> > >>> Can you paste the exact Exception you use? I think this might indicate > >>> that you don't correctly extract the key from your record, e.g. you > >>> extract a different key on sender and receiver. > >>> > >>> I'm sure we can figure this out after you provide more context. :-) > >>> > >>> – Ufuk >