I saw the doc in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html
.
Do i have to set that in the code or can i do it through flink-conf.yaml?

On Wed, Jan 22, 2020 at 7:54 PM Fanbin Bu <fanbin...@coinbase.com> wrote:

> Jingsong,
>
> Thank you for the response.
> Since I'm using flink on EMR and the latest version is 1.9 now. the second
> option is ruled out. but will keep that in mind for future upgrade.
>
> I'm going to try the first option. It's probably a good idea to add that
> in the doc for example:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> Thanks,
> Fanbin
>
> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
>> Hi Fanbin,
>>
>> Thanks for using blink batch mode.
>>
>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>
>> There are three options you can choose from:
>>
>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So
>> you need increase hash memory:
>> - table.exec.resource.hash-agg.memory: 1024 mb
>>
>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>> memory, so operator can use more manage memory, so you don't need configure
>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>
>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>> option now, I created JIRA to track it. [2]
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>>
>>>
>>> tried to increase memory:
>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
>>>
>>> and still got the same OOM exception.
>>>
>>> my sql is like:
>>>
>>> select id, hop_end(created_at, interval '30' second, interval '1' minute), 
>>> sum(field)... #20 of these sums
>>>
>>> from table group by id, hop(created_at, interval '30' second, interval '1' 
>>> minute)
>>>
>>>
>>>
>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <fanbin...@coinbase.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a batch job using blink planner. and got the following error. I
>>>> was able to successfully run the same job with flink 1.8 on yarn.
>>>>
>>>> I set conf as:
>>>> taskmanager.heap.size: 50000m
>>>>
>>>> and flink UI gives me
>>>> Last Heartbeat:20-01-22
>>>> 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots
>>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>>>> GBFlink Managed Memory:24.9 GB
>>>>
>>>> any suggestions on how to move forward?
>>>> Thanks,
>>>> Fanbin
>>>>
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>> at
>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>>>> ... 25 more
>>>>
>>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>>>> HashWinAggWithKeys$534.processElement(Unknown Source)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Reply via email to