Jingsong, I created https://issues.apache.org/jira/browse/FLINK-15928 to track the issue. Let me know if you need anything else to debug.
Thanks, Fanbin On Tue, Jan 28, 2020 at 12:54 AM Arvid Heise <ar...@ververica.com> wrote: > Hi Fanbin, > > you could use the RC1 of Flink that was created yesterday and use the > apache repo > https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/ > . > Alternatively, if you build Flink locally with `mvn install`, then you > could use mavenLocal() in your gradle.build and feed from that. > > Best, > > Arvid > > On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > >> I can build flink 1.10 and install it on to EMR >> (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my >> project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I >> continue to use 1.9.0 since there is no 1.10 available? >> >> Thanks, >> Fanbin >> >> On Fri, Jan 24, 2020 at 11:39 PM Bowen Li <bowenl...@gmail.com> wrote: >> >>> Hi Fanbin, >>> >>> You can install your own Flink build in AWS EMR, and it frees you from >>> Emr’s release cycles >>> >>> On Thu, Jan 23, 2020 at 03:36 Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>>> Fanbin, >>>> >>>> I have no idea now, can you created a JIRA to track it? You can >>>> describe complete SQL and some data informations. >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <fanbin...@coinbase.com> >>>> wrote: >>>> >>>>> Jingsong, >>>>> >>>>> Do you have any suggestions to debug the above mentioned >>>>> IndexOutOfBoundsException error? >>>>> Thanks, >>>>> >>>>> Fanbin >>>>> >>>>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <fanbin...@coinbase.com> >>>>> wrote: >>>>> >>>>>> I got the following error when running another job. any suggestions? >>>>>> >>>>>> Caused by: java.lang.IndexOutOfBoundsException >>>>>> at >>>>>> org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) >>>>>> at >>>>>> org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) >>>>>> at HashWinAggWithKeys$538.endInput(Unknown Source) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <fanbin...@coinbase.com> >>>>>> wrote: >>>>>> >>>>>>> Jingsong, >>>>>>> >>>>>>> I set the config value to be too large. After I changed it to a >>>>>>> smaller number it works now! >>>>>>> thanks you for the help. really appreciate it! >>>>>>> >>>>>>> Fanbin >>>>>>> >>>>>>> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <jingsongl...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Fanbin, >>>>>>>> >>>>>>>> Looks like your config is wrong, can you show your config code? >>>>>>>> >>>>>>>> Best, >>>>>>>> Jingsong Lee >>>>>>>> >>>>>>>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <fanbin...@coinbase.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Jingsong, >>>>>>>>> >>>>>>>>> Great, now i got a different error: >>>>>>>>> >>>>>>>>> java.lang.NullPointerException: Initial Segment may not be null >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149) >>>>>>>>> at LocalHashWinAggWithKeys$292.open(Unknown Source) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> >>>>>>>>> >>>>>>>>> is there any other config i should add? >>>>>>>>> >>>>>>>>> thanks, >>>>>>>>> >>>>>>>>> Fanbin >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <fanbin...@coinbase.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> you beat me to it. >>>>>>>>>> let's me try that. >>>>>>>>>> >>>>>>>>>> On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li < >>>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Fanbin, >>>>>>>>>>> >>>>>>>>>>> Document is here: >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html >>>>>>>>>>> NOTE: you need configure this into TableConfig. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jingsong Lee >>>>>>>>>>> >>>>>>>>>>> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu < >>>>>>>>>>> fanbin...@coinbase.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Jingsong, >>>>>>>>>>>> >>>>>>>>>>>> Thank you for the response. >>>>>>>>>>>> Since I'm using flink on EMR and the latest version is 1.9 now. >>>>>>>>>>>> the second option is ruled out. but will keep that in mind for >>>>>>>>>>>> future >>>>>>>>>>>> upgrade. >>>>>>>>>>>> >>>>>>>>>>>> I'm going to try the first option. It's probably a good idea to >>>>>>>>>>>> add that in the doc for example: >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Fanbin >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li < >>>>>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Fanbin, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for using blink batch mode. >>>>>>>>>>>>> >>>>>>>>>>>>> The OOM is caused by the manage memory not enough in Hash >>>>>>>>>>>>> aggregation. >>>>>>>>>>>>> >>>>>>>>>>>>> There are three options you can choose from: >>>>>>>>>>>>> >>>>>>>>>>>>> 1.Is your version Flink 1.9? 1.9 still use fix memory >>>>>>>>>>>>> configuration. So you need increase hash memory: >>>>>>>>>>>>> - table.exec.resource.hash-agg.memory: 1024 mb >>>>>>>>>>>>> >>>>>>>>>>>>> 2.In 1.10, we use slot manage memory to dynamic config real >>>>>>>>>>>>> operator memory, so operator can use more manage memory, so you >>>>>>>>>>>>> don't need >>>>>>>>>>>>> configure hash agg memory anymore. You can try 1.10 RC0 [1] >>>>>>>>>>>>> >>>>>>>>>>>>> 3.We can use sort aggregation to avoid OOM too, but there is >>>>>>>>>>>>> no config option now, I created JIRA to track it. [2] >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-release-candidate-0-td36770.html >>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-15732 >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Jingsong Lee >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu < >>>>>>>>>>>>> fanbin...@coinbase.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> tried to increase memory: >>>>>>>>>>>>>> flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 >>>>>>>>>>>>>> myjar >>>>>>>>>>>>>> >>>>>>>>>>>>>> and still got the same OOM exception. >>>>>>>>>>>>>> >>>>>>>>>>>>>> my sql is like: >>>>>>>>>>>>>> >>>>>>>>>>>>>> select id, hop_end(created_at, interval '30' second, interval >>>>>>>>>>>>>> '1' minute), sum(field)... #20 of these sums >>>>>>>>>>>>>> >>>>>>>>>>>>>> from table group by id, hop(created_at, interval '30' second, >>>>>>>>>>>>>> interval '1' minute) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu < >>>>>>>>>>>>>> fanbin...@coinbase.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a batch job using blink planner. and got the >>>>>>>>>>>>>>> following error. I was able to successfully run the same job >>>>>>>>>>>>>>> with flink 1.8 >>>>>>>>>>>>>>> on yarn. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I set conf as: >>>>>>>>>>>>>>> taskmanager.heap.size: 50000m >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and flink UI gives me >>>>>>>>>>>>>>> Last Heartbeat:20-01-22 >>>>>>>>>>>>>>> 14:56:25ID:container_1579720108062_0018_01_000020Data >>>>>>>>>>>>>>> Port:41029Free Slots >>>>>>>>>>>>>>> / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap >>>>>>>>>>>>>>> Size:10.3 >>>>>>>>>>>>>>> GBFlink Managed Memory:24.9 GB >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> any suggestions on how to move forward? >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Fanbin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Caused by: >>>>>>>>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job >>>>>>>>>>>>>>> execution failed. >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) >>>>>>>>>>>>>>> ... 25 more >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Caused by: java.io.IOException: Hash window aggregate map >>>>>>>>>>>>>>> OOM.* at HashWinAggWithKeys$534.processElement(Unknown >>>>>>>>>>>>>>> Source) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best, Jingsong Lee >>>>>>>> >>>>>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>>