Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second
option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in
the doc for example:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi Fanbin,
>
> Thanks for using blink batch mode.
>
> The OOM is caused by the manage memory not enough in Hash aggregation.
>
> There are three options you can choose from:
>
> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
> you need increase hash memory:
> - table.exec.resource.hash-agg.memory: 1024 mb
>
> 2.In 1.10, we use slot manage memory to dynamic config real operator
> memory, so operator can use more manage memory, so you don't need configure
> hash agg memory anymore. You can try 1.10 RC0 [1]
>
> 3.We can use sort aggregation to avoid OOM too, but there is no config
> option now, I created JIRA to track it. [2]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
> [2] https://issues.apache.org/jira/browse/FLINK-15732
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>>
>> tried to increase memory:
>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
>>
>> and still got the same OOM exception.
>>
>> my sql is like:
>>
>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>> sum(field)... #20 of these sums
>>
>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>> minute)
>>
>>
>>
>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <fanbin...@coinbase.com> wrote:
>>
>>> Hi,
>>>
>>> I have a batch job using blink planner. and got the following error. I
>>> was able to successfully run the same job with flink 1.8 on yarn.
>>>
>>> I set conf as:
>>> taskmanager.heap.size: 50000m
>>>
>>> and flink UI gives me
>>> Last Heartbeat:20-01-22
>>> 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots
>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>>> GBFlink Managed Memory:24.9 GB
>>>
>>> any suggestions on how to move forward?
>>> Thanks,
>>> Fanbin
>>>
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>>> ... 25 more
>>>
>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>>> HashWinAggWithKeys$534.processElement(Unknown Source)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>
>
> --
> Best, Jingsong Lee
>

Reply via email to