Hi Fanbin, You can install your own Flink build in AWS EMR, and it frees you from Emr’s release cycles
On Thu, Jan 23, 2020 at 03:36 Jingsong Li <jingsongl...@gmail.com> wrote: > Fanbin, > > I have no idea now, can you created a JIRA to track it? You can describe > complete SQL and some data informations. > > Best, > Jingsong Lee > > On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <fanbin...@coinbase.com> wrote: > >> Jingsong, >> >> Do you have any suggestions to debug the above mentioned >> IndexOutOfBoundsException error? >> Thanks, >> >> Fanbin >> >> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <fanbin...@coinbase.com> >> wrote: >> >>> I got the following error when running another job. any suggestions? >>> >>> Caused by: java.lang.IndexOutOfBoundsException >>> at >>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) >>> at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) >>> at HashWinAggWithKeys$538.endInput(Unknown Source) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276) >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151) >>> at >>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <fanbin...@coinbase.com> >>> wrote: >>> >>>> Jingsong, >>>> >>>> I set the config value to be too large. After I changed it to a smaller >>>> number it works now! >>>> thanks you for the help. really appreciate it! >>>> >>>> Fanbin >>>> >>>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Fanbin, >>>>> >>>>> Looks like your config is wrong, can you show your config code? >>>>> >>>>> Best, >>>>> Jingsong Lee >>>>> >>>>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <fanbin...@coinbase.com> >>>>> wrote: >>>>> >>>>>> Jingsong, >>>>>> >>>>>> Great, now i got a different error: >>>>>> >>>>>> java.lang.NullPointerException: Initial Segment may not be null >>>>>> at >>>>>> org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65) >>>>>> at >>>>>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49) >>>>>> at >>>>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522) >>>>>> at >>>>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190) >>>>>> at >>>>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149) >>>>>> at LocalHashWinAggWithKeys$292.open(Unknown Source) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> >>>>>> is there any other config i should add? >>>>>> >>>>>> thanks, >>>>>> >>>>>> Fanbin >>>>>> >>>>>> >>>>>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <fanbin...@coinbase.com> >>>>>> wrote: >>>>>> >>>>>>> you beat me to it. >>>>>>> let's me try that. >>>>>>> >>>>>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <jingsongl...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Fanbin, >>>>>>>> >>>>>>>> Document is here: >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html >>>>>>>> NOTE: you need configure this into TableConfig. >>>>>>>> >>>>>>>> Best, >>>>>>>> Jingsong Lee >>>>>>>> >>>>>>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <fanbin...@coinbase.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Jingsong, >>>>>>>>> >>>>>>>>> Thank you for the response. >>>>>>>>> Since I'm using flink on EMR and the latest version is 1.9 now. >>>>>>>>> the second option is ruled out. but will keep that in mind for future >>>>>>>>> upgrade. >>>>>>>>> >>>>>>>>> I'm going to try the first option. It's probably a good idea to >>>>>>>>> add that in the doc for example: >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Fanbin >>>>>>>>> >>>>>>>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li < >>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Fanbin, >>>>>>>>>> >>>>>>>>>> Thanks for using blink batch mode. >>>>>>>>>> >>>>>>>>>> The OOM is caused by the manage memory not enough in Hash >>>>>>>>>> aggregation. >>>>>>>>>> >>>>>>>>>> There are three options you can choose from: >>>>>>>>>> >>>>>>>>>> 1.Is your version Flink 1.9? 1.9 still use fix memory >>>>>>>>>> configuration. So you need increase hash memory: >>>>>>>>>> - table.exec.resource.hash-agg.memory: 1024 mb >>>>>>>>>> >>>>>>>>>> 2.In 1.10, we use slot manage memory to dynamic config real >>>>>>>>>> operator memory, so operator can use more manage memory, so you >>>>>>>>>> don't need >>>>>>>>>> configure hash agg memory anymore. You can try 1.10 RC0 [1] >>>>>>>>>> >>>>>>>>>> 3.We can use sort aggregation to avoid OOM too, but there is no >>>>>>>>>> config option now, I created JIRA to track it. [2] >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html >>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-15732 >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jingsong Lee >>>>>>>>>> >>>>>>>>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <fanbin...@coinbase.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> tried to increase memory: >>>>>>>>>>> flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar >>>>>>>>>>> >>>>>>>>>>> and still got the same OOM exception. >>>>>>>>>>> >>>>>>>>>>> my sql is like: >>>>>>>>>>> >>>>>>>>>>> select id, hop_end(created_at, interval '30' second, interval '1' >>>>>>>>>>> minute), sum(field)... #20 of these sums >>>>>>>>>>> >>>>>>>>>>> from table group by id, hop(created_at, interval '30' second, >>>>>>>>>>> interval '1' minute) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu < >>>>>>>>>>> fanbin...@coinbase.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I have a batch job using blink planner. and got the following >>>>>>>>>>>> error. I was able to successfully run the same job with flink 1.8 >>>>>>>>>>>> on yarn. >>>>>>>>>>>> >>>>>>>>>>>> I set conf as: >>>>>>>>>>>> taskmanager.heap.size: 50000m >>>>>>>>>>>> >>>>>>>>>>>> and flink UI gives me >>>>>>>>>>>> Last Heartbeat:20-01-22 >>>>>>>>>>>> 14:56:25ID:container_1579720108062_0018_01_000020Data >>>>>>>>>>>> Port:41029Free Slots >>>>>>>>>>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap >>>>>>>>>>>> Size:10.3 >>>>>>>>>>>> GBFlink Managed Memory:24.9 GB >>>>>>>>>>>> >>>>>>>>>>>> any suggestions on how to move forward? >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Fanbin >>>>>>>>>>>> >>>>>>>>>>>> Caused by: >>>>>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job >>>>>>>>>>>> execution failed. >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) >>>>>>>>>>>> ... 25 more >>>>>>>>>>>> >>>>>>>>>>>> *Caused by: java.io.IOException: Hash window aggregate map OOM.* >>>>>>>>>>>> at HashWinAggWithKeys$534.processElement(Unknown Source) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best, Jingsong Lee >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best, Jingsong Lee >>>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> > > -- > Best, Jingsong Lee >