Hi Chang Liu,

The unbounded nature of the stream keyed or not should not lead to out of 
memory. 

Flink parallel keyed operator instances have fixed number (parallelism) and 
just process some range of keyed elements, in your example it is a subrange of 
session ids. 

The keyed processed elements (http requests) are objects created when they 
enter the pipeline and garage collected after having been processed in 
streaming fashion. 

If they arrive very rapidly it can lead to high back pressure from upstream to 
downstream operators, buffers can become full and pipeline stops/slows down 
processing external inputs, it usually means that your pipeline is under 
provisioned. 

The only accumulated data comes from state (windows, user state etc), so if you 
control its memory consumption, as Till described, there should be no other 
source of out of memory.

Cheers,
Andrey

> On 25 Jul 2018, at 19:06, Chang Liu <fluency...@gmail.com> wrote:
> 
> Hi Till,
> 
> Thanks for your reply. But I think maybe I did not make my question clear. My 
> question is not about whether the States within each keyed operator instances 
> will run out of memory. My question is about, whether the unlimited keyed 
> operator instances themselves will run out of memory.
> 
> So to reply to your answers, no matter using different State backends or 
> regularly cleaning up the States (which is exactly what I am doing), it does 
> not concern the number of keyed operator instances.
> 
> I would like to know:
> Will the number of keyed operator instances (Java objects?) grow unbounded? 
> If so, will they run out of memory? This is not actually related to the 
> memory used by the keyed Stated inside.
> If not, then how Flink is managing this multiple keyed operator instances?
> 
> I think this needs more knowledge about how Flink works internally to 
> understand how keyed operator instances are created, maintained and 
> destroyed. That’s why I would like your help understanding this.
> 
> Many Thanks.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 24 Jul 2018, at 14:31, Till Rohrmann <trohrm...@apache.org 
>> <mailto:trohrm...@apache.org>> wrote:
>> 
>> Hi Chang Liu,
>> 
>> if you are dealing with an unlimited number of keys and keep state around 
>> for every key, then your state size will keep growing with the number of 
>> keys. If you are using the FileStateBackend which keeps state in memory, you 
>> will eventually run into an OutOfMemoryException. One way to solve/mitigate 
>> this problem is to use the RocksDBStateBackend which can go out of core.
>> 
>> Alternatively, you would need to clean up your state before you run out of 
>> memory. One way to do this is to register for every key a timer which clears 
>> the state. But this only works if you don't amass too much state data before 
>> the timer is triggered. If you wish this solution is some kind of a poor 
>> man's state TTL. The Flink community is currently developing a proper 
>> implementation of it which does not rely on additional timers (which 
>> increases the state footprint) [1].
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-9510 
>> <https://issues.apache.org/jira/browse/FLINK-9510>
>> 
>> Cheers,
>> Till
>> 
>> On Tue, Jul 24, 2018 at 10:11 AM Chang Liu <fluency...@gmail.com 
>> <mailto:fluency...@gmail.com>> wrote:
>> Dear All,
>> 
>> I have questions regarding the keys. In general, the questions are:
>> what happens if I am doing keyBy based on unlimited number of keys? How 
>> Flink is managing each KeyedStream under the hood? Will I get memory 
>> overflow, for example, if every KeyStream associated with a specific key is 
>> taking certain amount of memory?
>> BTW, I think it is fare to say that, I have to clear my KeyedState so that 
>> the memory used by these State are cleaned up regularly. But still, I am 
>> wondering, even though I am regularly cleaning up State memory, what 
>> happened to memory used by the KeyedStream itself, if there is? And will 
>> they be exploding?
>> 
>> Let me give an example for understanding it clearly.  Let’s say we have a
>> 
>>      val requestStream: DataStream[HttpRequest]
>> 
>> which is a stream of HTTP requests. And by using the session ID as the key, 
>> we can obtain a KeyedStream per single session, as following:
>> 
>>         val streamPerSession: KeyedStream[HttpRequest] = 
>> requestStream.keyBy(_.sessionId)
>> 
>> However, the session IDs are actually a hashcode generated randomly by the 
>> Web service/application, so that means, the number of sessions are unlimited 
>> (which is reasonable, because every time a user open the application or 
>> login, he/she will get a new unique session). 
>> 
>> Then, the question is: will Flink eventually run out of memory because the 
>> number of sessions are unlimited (and because we are keying by the session 
>> ID)?
>> If so, how can we properly manage this situation?
>> If not, could you help me understand WHY?
>> Let’s also assume that, we are regularly clearing the KeyedState, so the 
>> memory used by the State will not explode. 
>> 
>> 
>> Many Thanks and Looking forward to your reply :)
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
> 

Reply via email to