I checked the logs, but no information indicates what happens. In fact, in the same app, there is another stream, but its kafka source is low traffic, and I aggregate some field of that source too, and flink gives correct results continuously. So I doubt if keyby() could not handle high traffic well (which affects the number of keys in the key partitions).
2018-01-01 2:04 GMT+08:00 Steven Wu <stevenz...@gmail.com>: >> but soon later, no results produced, and flink seems busy doing something >> forever. > > Jinhua, don't know if you have checked these things. if not, maybe worth a > look. > > have you tried to do a thread dump? > How is the GC pause? > do you see flink restart? check the exception tab in Flink web UI for your > job. > > > > On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <luajit...@gmail.com> wrote: >> >> I take time to read some source codes about the keyed stream >> windowing, and I make below understanding: >> >> a) the keyed stream would be split and dispatched to downstream tasks >> in hash manner, and the hash base is the parallelism of the downstream >> operator: >> >> See >> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int, >> int): >> MathUtils.murmurHash(keyHash) % maxParallelism; >> >> That's what the doc said "hash partitioning". >> >> So the compiled execution graph already determines whose operator >> instance receive which key groups. >> >> b) with windowing, the key is used to index window states, so the >> window function would receive the deserialized value from its >> corresponding window state of some key. >> >> b.1) The element would be added into the state first: >> >> See >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>): >> windowState.add(element.getValue()); >> >> b.2) when the trigger fires the window, the value would be >> deserialized from the keyed state: >> >> ACC contents = windowState.get(); >> emitWindowContents(actualWindow, contents); >> >> For rocksdb backend, each input element would be taken back and forth >> from the disk in the processing. >> >> flink's keyed stream has the same functionality as storm's field >> grouping, and more complicated. >> >> Am I correct? >> >> >> But I still could not understand why keyby() stops flink from >> returning expected results. >> >> Let me explain my case more: >> I use kafka data source, which collects log lines of log files from >> tens of machines. >> The log line is in json format, which contains the "ip" field, the ip >> address of the user, so it could be valued in million of ip addresses >> of the Internet. >> The stream processing is expected to result in ip aggregation in {1 >> hour, 1 min} sliding window. >> >> If I use keyBy("ip"), then at first minutes, the flink could give me >> correct aggregation results, but soon later, no results produced, and >> flink seems busy doing something forever. >> >> I doubt if keyby() could handle huge keys like this case, and when I >> remove keyby().window().fold() and use windowAll().fold() instead (the >> latter fold operator uses hashmap to aggregate ip by itself), flink >> works. But as known, the windowAll() is not scale-able. >> >> Could flink developers help me on this topic, I prefer flink and I >> believe flink is one of best stream processing frameworks, but I am >> really frustrated that flink could be fulfill its feature just like >> the doc said. >> >> Thank you all. >> >> >> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: >> > I misuse the key selector. I checked the doc and found it must return >> > deterministic key, so using random is wrong, but I still could not >> > understand why it would cause oom. >> > >> > >> > >> > 2017-12-28 21:57 GMT+08:00 Jinhua Luo <luajit...@gmail.com>: >> >> It's very strange, when I change the key selector to use random key, >> >> the jvm reports oom. >> >> >> >> .keyBy(new KeySelector<MyEvent, Integer>() { >> >> public Integer getKey(MyEvent ev) { return >> >> ThreadLocalRandom.current().nextInt(1, 100);} >> >> }) >> >> >> >> Caused by: java.lang.OutOfMemoryError: Java heap space >> >> at >> >> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469) >> >> at >> >> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230) >> >> at >> >> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144) >> >> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818) >> >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) >> >> at >> >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) >> >> at >> >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) >> >> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >> >> at >> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175) >> >> at >> >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) >> >> at >> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >> >> at >> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> >> at >> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> >> at >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >> >> at >> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >> >> at >> >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> >> >> >> Could anybody explain the internal of keyby()? >> >> >> >> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>: >> >>> Hey Jinhua, >> >>> >> >>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com> >> >>> wrote: >> >>>> The keyby() upon the field would generate unique key as the field >> >>>> value, so if the number of the uniqueness is huge, flink would have >> >>>> trouble both on cpu and memory. Is it considered in the design of >> >>>> flink? >> >>> >> >>> Yes, keyBy hash partitions the data across the nodes of your Flink >> >>> application and thus you can easily scale your application up if you >> >>> need more processing power. >> >>> >> >>> I'm not sure that this is the problem in your case though. Can you >> >>> provide some more details what you are doing exactly? Are you >> >>> aggregating by time (for the keyBy you mention no windowing, but then >> >>> you mention windowAll)? What kind of aggregation are you doing? If >> >>> possible, feel free to share some code. >> >>> >> >>>> Since windowsAll() could be set parallelism, so I try to use key >> >>>> selector to use field hash but not value, that I hope it would >> >>>> decrease the number of the keys, but the flink throws key >> >>>> out-of-range >> >>>> exception. How to use key selector in correct way? >> >>> >> >>> Can you paste the exact Exception you use? I think this might indicate >> >>> that you don't correctly extract the key from your record, e.g. you >> >>> extract a different key on sender and receiver. >> >>> >> >>> I'm sure we can figure this out after you provide more context. :-) >> >>> >> >>> – Ufuk > >