You can check the log to show the related stack in OOM, maybe we can confirm 
some reasons.
Or you can dump the heap to analyze the memory usages after OOM.

发件人:Darshan Singh <>
发送时间:2018年8月29日(星期三) 19:22
收件人:wangzhijiang999 <>
抄 送:chesnay <>; user <>
主 题:Re: Backpressure? for Batches


I thought either Group by is causing the OOM but it is mentioned that sort will 
be spilled to disk so that there is no way for that to cause the OOM. So I was 
looking maybe due to back pressure some of data read from hdfs is kept in 
memory as it is not consumed and that is causing OOM.So it seems this is not 
possible as well so need to relook what could be causing the OOM.

On Wed, Aug 29, 2018 at 12:41 PM Zhijiang(wangzhijiang999) 
<> wrote:
chesnay is right. For batch job there are two ways for notifying consumable. 
One is the first record emitted by upstream and the other is the upstream 
finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the 
upstreams until source node. But it will not cause OOM for caching in-flight 
buffers normally because they are managed by framework and will not exceed 
unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow 
node(groupby in your case) or decrease the parallelism of fast node(source in 
your case).

发件人:Darshan Singh <>
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay <>
抄 送:wangzhijiang999 <>; user <>
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from 
hdfs than my say map or group by can consume? Is there some sort of 
configuration which says read only 10000 rows and then stop and then reread 
etc. Otherwise source will keep on sending the data or keeping in some sort of 
buffers and will be OOM. Or setting different parallelism while reading is the 
way to handle this?

On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <> wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is 
data to be consumed, i.e. one the first record was emitted by the previous 
operator. As such back-pressure exists in batch just like in streaming.

 On 29.08.2018 11:39, Darshan Singh wrote:

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of 
makes sense. From your answer I understood that flink will do first number 1 
once that is completed it will do map(or grouping as well) and then grouping 
and finally the write. Thus, there should be 1 task running at 1 time. This 
doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the 
deserialise part or maybe reading from hdfs itself? 

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) 
<> wrote:
The backpressure is caused when downstream and upstream are running 
concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the 
backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, 
that means the downstream will be scheduled after upstream finishes, so the 
slower downstream will not block upstream running, then the backpressure may 
not exist in this case.

发件人:Darshan Singh <>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could 
face the same with the batches as well. 

In theory it should be possible. But in Web UI for backpressure tab for batches 
I was seeing that it was just showing the tasks status and no status like "OK" 

So I was wondering if backpressure is a thing for batches. If yes, how do we 
reduce this especially if I am reading from hdfs.


Reply via email to