I got the following error when running another job. any suggestions? Caused by: java.lang.IndexOutOfBoundsException at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at HashWinAggWithKeys$538.endInput(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138) at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)
On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[email protected]> wrote: > Jingsong, > > I set the config value to be too large. After I changed it to a smaller > number it works now! > thanks you for the help. really appreciate it! > > Fanbin > > On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[email protected]> > wrote: > >> Fanbin, >> >> Looks like your config is wrong, can you show your config code? >> >> Best, >> Jingsong Lee >> >> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[email protected]> >> wrote: >> >>> Jingsong, >>> >>> Great, now i got a different error: >>> >>> java.lang.NullPointerException: Initial Segment may not be null >>> at >>> org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65) >>> at >>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49) >>> at >>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522) >>> at >>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190) >>> at >>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149) >>> at LocalHashWinAggWithKeys$292.open(Unknown Source) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >>> is there any other config i should add? >>> >>> thanks, >>> >>> Fanbin >>> >>> >>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[email protected]> >>> wrote: >>> >>>> you beat me to it. >>>> let's me try that. >>>> >>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[email protected]> >>>> wrote: >>>> >>>>> Fanbin, >>>>> >>>>> Document is here: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html >>>>> NOTE: you need configure this into TableConfig. >>>>> >>>>> Best, >>>>> Jingsong Lee >>>>> >>>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[email protected]> >>>>> wrote: >>>>> >>>>>> Jingsong, >>>>>> >>>>>> Thank you for the response. >>>>>> Since I'm using flink on EMR and the latest version is 1.9 now. the >>>>>> second option is ruled out. but will keep that in mind for future >>>>>> upgrade. >>>>>> >>>>>> I'm going to try the first option. It's probably a good idea to add >>>>>> that in the doc for example: >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html >>>>>> >>>>>> Thanks, >>>>>> Fanbin >>>>>> >>>>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Fanbin, >>>>>>> >>>>>>> Thanks for using blink batch mode. >>>>>>> >>>>>>> The OOM is caused by the manage memory not enough in Hash >>>>>>> aggregation. >>>>>>> >>>>>>> There are three options you can choose from: >>>>>>> >>>>>>> 1.Is your version Flink 1.9? 1.9 still use fix memory configuration. >>>>>>> So you need increase hash memory: >>>>>>> - table.exec.resource.hash-agg.memory: 1024 mb >>>>>>> >>>>>>> 2.In 1.10, we use slot manage memory to dynamic config real operator >>>>>>> memory, so operator can use more manage memory, so you don't need >>>>>>> configure >>>>>>> hash agg memory anymore. You can try 1.10 RC0 [1] >>>>>>> >>>>>>> 3.We can use sort aggregation to avoid OOM too, but there is no >>>>>>> config option now, I created JIRA to track it. [2] >>>>>>> >>>>>>> [1] >>>>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html >>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-15732 >>>>>>> >>>>>>> Best, >>>>>>> Jingsong Lee >>>>>>> >>>>>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> tried to increase memory: >>>>>>>> flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar >>>>>>>> >>>>>>>> and still got the same OOM exception. >>>>>>>> >>>>>>>> my sql is like: >>>>>>>> >>>>>>>> select id, hop_end(created_at, interval '30' second, interval '1' >>>>>>>> minute), sum(field)... #20 of these sums >>>>>>>> >>>>>>>> from table group by id, hop(created_at, interval '30' second, interval >>>>>>>> '1' minute) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I have a batch job using blink planner. and got the following >>>>>>>>> error. I was able to successfully run the same job with flink 1.8 on >>>>>>>>> yarn. >>>>>>>>> >>>>>>>>> I set conf as: >>>>>>>>> taskmanager.heap.size: 50000m >>>>>>>>> >>>>>>>>> and flink UI gives me >>>>>>>>> Last Heartbeat:20-01-22 >>>>>>>>> 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free >>>>>>>>> Slots >>>>>>>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 >>>>>>>>> GBFlink Managed Memory:24.9 GB >>>>>>>>> >>>>>>>>> any suggestions on how to move forward? >>>>>>>>> Thanks, >>>>>>>>> Fanbin >>>>>>>>> >>>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>>>>> Job execution failed. >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) >>>>>>>>> ... 25 more >>>>>>>>> >>>>>>>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* >>>>>>>>> at HashWinAggWithKeys$534.processElement(Unknown Source) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >> >> -- >> Best, Jingsong Lee >> >
