I checked the logs, but no information indicates what happens.

In fact, in the same app, there is another stream, but its kafka
source is low traffic, and I aggregate some field of that source too,
and flink gives correct results continuously.
So I doubt if keyby() could not handle high traffic well (which
affects the number of keys in the key partitions).

2018-01-01 2:04 GMT+08:00 Steven Wu <stevenz...@gmail.com>:
>>  but soon later, no results produced, and flink seems busy doing something
>> forever.
>
> Jinhua, don't know if you have checked these things. if not, maybe worth a
> look.
>
> have you tried to do a thread dump?
> How is the GC pause?
> do you see flink restart? check the exception tab in Flink web UI for your
> job.
>
>
>
> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo <luajit...@gmail.com> wrote:
>>
>> I take time to read some source codes about the keyed stream
>> windowing, and I make below understanding:
>>
>> a) the keyed stream would be split and dispatched to downstream tasks
>> in hash manner, and the hash base is the parallelism of the downstream
>> operator:
>>
>> See
>> org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int,
>> int):
>> MathUtils.murmurHash(keyHash) % maxParallelism;
>>
>> That's what the doc said "hash partitioning".
>>
>> So the compiled execution graph already determines whose operator
>> instance receive which key groups.
>>
>> b) with windowing, the key is used to index window states, so the
>> window function would receive the deserialized value from its
>> corresponding window state of some key.
>>
>> b.1) The element would be added into the state first:
>>
>> See
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord<IN>):
>> windowState.add(element.getValue());
>>
>> b.2) when the trigger fires the window, the value would be
>> deserialized from the keyed state:
>>
>> ACC contents = windowState.get();
>> emitWindowContents(actualWindow, contents);
>>
>> For rocksdb backend, each input element would be taken back and forth
>> from the disk in the processing.
>>
>> flink's keyed stream has the same functionality as storm's field
>> grouping, and more complicated.
>>
>> Am I correct?
>>
>>
>> But I still could not understand why keyby() stops flink from
>> returning expected results.
>>
>> Let me explain my case more:
>> I use kafka data source, which collects log lines of log files from
>> tens of machines.
>> The log line is in json format, which contains the "ip" field, the ip
>> address of the user, so it could be valued in million of ip addresses
>> of the Internet.
>> The stream processing is expected to result in ip aggregation in {1
>> hour, 1 min} sliding window.
>>
>> If I use keyBy("ip"), then at first minutes, the flink could give me
>> correct aggregation results, but soon later, no results produced, and
>> flink seems busy doing something forever.
>>
>> I doubt if keyby() could handle huge keys like this case, and when I
>> remove keyby().window().fold() and use windowAll().fold() instead (the
>> latter fold operator uses hashmap to aggregate ip by itself), flink
>> works. But as known, the windowAll() is not scale-able.
>>
>> Could flink developers help me on this topic, I prefer flink and I
>> believe flink is one of best stream processing frameworks, but I am
>> really frustrated that flink could be fulfill its feature just like
>> the doc said.
>>
>> Thank you all.
>>
>>
>> 2017-12-29 17:42 GMT+08:00 Jinhua Luo <luajit...@gmail.com>:
>> > I misuse the key selector. I checked the doc and found it must return
>> > deterministic key, so using random is wrong, but I still could not
>> > understand why it would cause oom.
>> >
>> >
>> >
>> > 2017-12-28 21:57 GMT+08:00 Jinhua Luo <luajit...@gmail.com>:
>> >> It's very strange, when I change the key selector to use random key,
>> >> the jvm reports oom.
>> >>
>> >>    .keyBy(new KeySelector<MyEvent, Integer>() {
>> >>      public Integer getKey(MyEvent ev) { return
>> >> ThreadLocalRandom.current().nextInt(1, 100);}
>> >>    })
>> >>
>> >> Caused by: java.lang.OutOfMemoryError: Java heap space
>> >>         at
>> >> com.esotericsoftware.kryo.util.IdentityMap.resize(IdentityMap.java:469)
>> >>         at
>> >> com.esotericsoftware.kryo.util.IdentityMap.push(IdentityMap.java:230)
>> >>         at
>> >> com.esotericsoftware.kryo.util.IdentityMap.put(IdentityMap.java:144)
>> >>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:818)
>> >>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>> >>         at
>> >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
>> >>         at
>> >> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>> >>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>> >>         at
>> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>> >>         at
>> >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
>> >>         at
>> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
>> >>         at
>> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>> >>         at
>> >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>> >>         at
>> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>> >>         at
>> >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>> >>         at
>> >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> >>
>> >> Could anybody explain the internal of keyby()?
>> >>
>> >> 2017-12-28 17:33 GMT+08:00 Ufuk Celebi <u...@apache.org>:
>> >>> Hey Jinhua,
>> >>>
>> >>> On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo <luajit...@gmail.com>
>> >>> wrote:
>> >>>> The keyby() upon the field would generate unique key as the field
>> >>>> value, so if the number of the uniqueness is huge, flink would have
>> >>>> trouble both on cpu and memory. Is it considered in the design of
>> >>>> flink?
>> >>>
>> >>> Yes, keyBy hash partitions the data across the nodes of your Flink
>> >>> application and thus you can easily scale your application up if you
>> >>> need more processing power.
>> >>>
>> >>> I'm not sure that this is the problem in your case though. Can you
>> >>> provide some more details what you are doing exactly? Are you
>> >>> aggregating by time (for the keyBy you mention no windowing, but then
>> >>> you mention windowAll)? What kind of aggregation are you doing? If
>> >>> possible, feel free to share some code.
>> >>>
>> >>>> Since windowsAll() could be set parallelism, so I try to use key
>> >>>> selector to use field hash but not value, that I hope it would
>> >>>> decrease the number of the keys, but the flink throws key
>> >>>> out-of-range
>> >>>> exception. How to use key selector in correct way?
>> >>>
>> >>> Can you paste the exact Exception you use? I think this might indicate
>> >>> that you don't correctly extract the key from your record, e.g. you
>> >>> extract a different key on sender and receiver.
>> >>>
>> >>> I'm sure we can figure this out after you provide more context. :-)
>> >>>
>> >>> – Ufuk
>
>

Reply via email to