Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <fanbin...@coinbase.com> wrote:

> Jingsong,
>
> Great, now i got a different error:
>
> java.lang.NullPointerException: Initial Segment may not be null
>       at 
> org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
>       at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
>       at LocalHashWinAggWithKeys$292.open(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>       at java.lang.Thread.run(Thread.java:748)
>
>
> is there any other config i should add?
>
> thanks,
>
> Fanbin
>
>
> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>> you beat me to it.
>> let's me try that.
>>
>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Fanbin,
>>>
>>> Document is here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
>>> NOTE: you need configure this into TableConfig.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <fanbin...@coinbase.com>
>>> wrote:
>>>
>>>> Jingsong,
>>>>
>>>> Thank you for the response.
>>>> Since I'm using flink on EMR and the latest version is 1.9 now. the
>>>> second option is ruled out. but will keep that in mind for future upgrade.
>>>>
>>>> I'm going to try the first option. It's probably a good idea to add
>>>> that in the doc for example:
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>>>>
>>>> Thanks,
>>>> Fanbin
>>>>
>>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Fanbin,
>>>>>
>>>>> Thanks for using blink batch mode.
>>>>>
>>>>> The OOM is caused by the manage memory not enough in Hash aggregation.
>>>>>
>>>>> There are three options you can choose from:
>>>>>
>>>>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration.
>>>>> So you need increase hash memory:
>>>>> - table.exec.resource.hash-agg.memory: 1024 mb
>>>>>
>>>>> 2.In 1.10, we use slot manage memory to dynamic config real operator
>>>>> memory, so operator can use more manage memory, so you don't need 
>>>>> configure
>>>>> hash agg memory anymore. You can try 1.10 RC0 [1]
>>>>>
>>>>> 3.We can use sort aggregation to avoid OOM too, but there is no config
>>>>> option now, I created JIRA to track it. [2]
>>>>>
>>>>> [1]
>>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-15732
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <fanbin...@coinbase.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> tried to increase memory:
>>>>>> flink run  -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
>>>>>>
>>>>>> and still got the same OOM exception.
>>>>>>
>>>>>> my sql is like:
>>>>>>
>>>>>> select id, hop_end(created_at, interval '30' second, interval '1' 
>>>>>> minute), sum(field)... #20 of these sums
>>>>>>
>>>>>> from table group by id, hop(created_at, interval '30' second, interval 
>>>>>> '1' minute)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <fanbin...@coinbase.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have a batch job using blink planner. and got the following error.
>>>>>>> I was able to successfully run the same job with flink 1.8 on yarn.
>>>>>>>
>>>>>>> I set conf as:
>>>>>>> taskmanager.heap.size: 50000m
>>>>>>>
>>>>>>> and flink UI gives me
>>>>>>> Last Heartbeat:20-01-22
>>>>>>> 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free 
>>>>>>> Slots
>>>>>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3
>>>>>>> GBFlink Managed Memory:24.9 GB
>>>>>>>
>>>>>>> any suggestions on how to move forward?
>>>>>>> Thanks,
>>>>>>> Fanbin
>>>>>>>
>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>> Job execution failed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>>>>>>> ... 25 more
>>>>>>>
>>>>>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* at
>>>>>>> HashWinAggWithKeys$534.processElement(Unknown Source)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

-- 
Best, Jingsong Lee

Reply via email to