Stacktrace generates every time with the following settings (tried different memory fractions): yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7/0.3/0.1 taskmanager.memory.off-heap: true taskmanager.memory.preallocate: true env.getConfig().setExecutionMode(ExecutionMode.BATCH)
Hand Jammed top of the stack: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getInterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:149) ... lots of netty stuffs While I observe the taskmanagers I never see their JVM heaps get high at all. Mind you I cant tell which task will blow and then see its TM in time to see what it looks like. But each one I do look at the heap usage is ~150MB/6.16GB (with fraction: 0.1) On Thu, Dec 7, 2017 at 11:59 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace? > The managed memory should be divided among all possible consumers. In case > of your simple job, this should just be Sorter. > In fact, I'd try to reduce the fraction to give more memory to the JVM > heap (OOM means there was not enough (heap) memory). > > Enabling BATCH mode means that the records are not shipped to the sorter > in a pipelined fashion but buffered at (and written to the disk of) the > sender task. > Once the input was consumed, the data is shipped to the receiver tasks > (the sorter). This mode decouples tasks and also reduces the number of > network buffers because fewer connection must be active at the same time.+ > Here's a link to an internal design document (not sure how up to date it > is though...) [1]. > > Did you try to check if the problem is cause by data skew? > You could add a MapPartition tasks instead of the PartitionSorter to count > the number of records per partition. > > Best, Fabian > > [1] https://cwiki.apache.org/confluence/display/FLINK/Data+ > exchange+between+tasks > > 2017-12-07 16:30 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: > >> Thanks for the reply again, >> >> I'm currently doing runs with: >> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 >> akka.ask.timeout: 60s >> containerized.heap-cutoff-ratio: 0.15 >> taskmanager.memory.fraction: 0.7 >> taskmanager.memory.off-heap: true >> taskmanager.memory.preallocate: true >> >> When I change the config setExecutionMode() to BATCH, no matter what >> memory fraction I choose the sort instantly fails with SortMerger OOM >> exceptions. Even when I set fraction to 0.95. The data source part is >> ridiculously fast though, ~30 seconds! Disabling batch mode and keeping >> the other changes looks like to do the same behavior as before, jobs been >> running for ~20 minutes now. Does Batch mode disable spilling to disk, or >> does batch with a combo of off heap disable spilling to disk? Is there >> more documentation on what Batch mode does under the covers? >> >> As for the flow itself, yes it used to be a lot smaller, I broke it out >> manually by adding the sort/partition to see which steps were causing me >> the slowdown, thinking it was my code, I wanted to separate the operations. >> >> Thank you again for your help. >> >> On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> That doesn't look like a bad configuration. >>> >>> I have to correct myself regarding the size of the managed memory. The >>> fraction (70%) is applied on the free memory after the TM initialization. >>> This means that memory for network buffers (and other data structures) are >>> subtracted before the managed memory is allocated. >>> The actual size of the managed memory is logged in the TM log file >>> during start up. >>> >>> You could also try to decrease the number of slots per TM to 1 but add >>> more vCores (yarn.containers.vcores []) because the sorter runs in >>> multiple threads. >>> >>> Adding a GroupCombineFunction for pre-aggregation (if possible...) would >>> help to mitigate the effects of the data skew. >>> Another thing I'd like to ask: Are you adding the partitioner and sorter >>> explicitly to the plan and if so why? Usually, the partitioning and sorting >>> is done as part of the GroupReduce. >>> >>> Best, Fabian >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> setup/config.html#yarn >>> >>> 2017-12-06 23:32 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: >>> >>>> Wow thank you for the reply, you gave me a lot to look into and mess >>>> with. I'll start testing with the various memory options and env settings >>>> tomorrow. >>>> >>>> BTW the current flink cluster is launched like: >>>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 >>>> >>>> with flink-conf.yaml property overrides of: >>>> # so bigger clusters don't fail to init >>>> akka.ask.timeout: 60s >>>> # so more memory is given to the JVM from the yarn container >>>> containerized.heap-cutoff-ratio: 0.15 >>>> >>>> So each flink slot doesn't necessarily get a lot of ram, you said 70% >>>> of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB. So >>>> each slot is sitting with ~2737MB of usable space. Would you have a >>>> different config for taking overall the same amount of ram? >>>> >>>> >>>> >>>> >>>> On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Garrett, >>>>> >>>>> data skew might be a reason for the performance degradation. >>>>> >>>>> The plan you shared is pretty simple. The following happens you run >>>>> the program: >>>>> - The data source starts to read data and pushes the records to the >>>>> FlatMapFunction. From there the records are shuffed (using >>>>> hash-partitioning) to the sorter. >>>>> - The sorter tasks consume the records and write them into a memory >>>>> buffer. When the buffer is full, it is sorted and spilled to disk. When >>>>> the >>>>> buffer was spilled, it is filled again with records, sorted, and spilled. >>>>> - The initially fast processing happens because at the beginning the >>>>> sorter is not waiting for buffers to be sorted or spilled because they are >>>>> empty. >>>>> >>>>> The performance of the plan depends (among other things) on the size >>>>> of the sort buffers. The sort buffers are taken from Flink's managed >>>>> memory. >>>>> Unless you configured something else, 70% of to the TaskManager heap >>>>> memory is reserved as managed memory. >>>>> If you use Flink only for batch jobs, I would enable preallocation and >>>>> off-heap memory (see configuration options [1]). You can also configure a >>>>> fixed size for the managed memory. The more memory you configure, the more >>>>> is available for sorting. >>>>> >>>>> The managed memory of a TM is evenly distributed to all its processing >>>>> slots. Hence, having more slots per TM means that each slot has fewer >>>>> managed memory (for sorting or joins or ...). >>>>> So many slots are not necessarily good for performance (unless you >>>>> increase the number of TMs / memory as well), especially in case of data >>>>> skew when most slots receive only little data and cannot leverage their >>>>> memory. >>>>> If your data is heavily skewed, it might make sense to have fewer >>>>> slots such that each slot has more memory for sorting. >>>>> >>>>> Skew has also an effect on downstream operations. In case of skew, >>>>> some of the sorter tasks are overloaded and cannot accept more data. >>>>> Due to the pipelined shuffles, this leads to a back pressure behavior >>>>> that propagates down to the sources. >>>>> You can disable pipelining by setting the execution mode on the >>>>> execution configuration to BATCH [2]. This will break the pipeline but >>>>> write the result of the FlatMap to disk. >>>>> This might help, if the FlatMap is compute intensive or filters many >>>>> records. >>>>> >>>>> The data sizes don't sound particular large, so this should be >>>>> something that Flink should be able to handle. >>>>> >>>>> Btw. you don't need to convert the JSON plan output. You can paste it >>>>> into the plan visualizer [3]. >>>>> I would not worry about the missing statistics. The optimizer does not >>>>> leverage them at the current state. >>>>> >>>>> Best, Fabian >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>> setup/config.html#managed-memory >>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>> dev/execution_configuration.html >>>>> [3] http://flink.apache.org/visualizer/ >>>>> >>>>> 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: >>>>> >>>>>> Fabian, >>>>>> >>>>>> Thank you for the reply. Yes I do watch via the ui, is there >>>>>> another way to see progress through the steps? >>>>>> >>>>>> I think I just figured it out, the hangup is in the sort phase (ID 4) >>>>>> where 2 slots take all the time. Looking in the UI most slots get less >>>>>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together >>>>>> its about 272M records and these will run for hours at this point. Looks >>>>>> like I need to figure out a different partitioning/sort strategy. I never >>>>>> noticed before because when I run the system at ~1400 slots I don't use >>>>>> the >>>>>> UI anymore as its gets unresponsive. 400 Slots is painfully slow, but >>>>>> still works. >>>>>> >>>>>> >>>>>> The getEnv output is very cool! Also very big, I've tried to >>>>>> summarize it here in more of a yaml format as its on a different network. >>>>>> Note the parallelism was just set to 10 as I didn't know if that effected >>>>>> output. Hopefully I didn't flub a copy paste step, it looks good to me. >>>>>> >>>>>> >>>>>> This flow used to be far fewer steps, but as it wasn't scaling I >>>>>> broke it out into all the distinct pieces so I could see where it >>>>>> failed. >>>>>> Source and sink are both Hive tables. I wonder if the inputformat is >>>>>> expected to give more info to seed some of these stat values? >>>>>> >>>>>> nodes >>>>>> id: 6 >>>>>> type: source >>>>>> pact: Data Source >>>>>> contents: at CreateInput(ExecutionEnvironment.java:533) >>>>>> parallelism: 10 >>>>>> global_properties: >>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>> name: Partitioning Order value: none >>>>>> name: Uniqueness value: not unique >>>>>> local_properties: >>>>>> name: Order value: none >>>>>> name: Grouping value: not grouped >>>>>> name: Uniqueness value: not unique >>>>>> estimates: >>>>>> name: Est. Output Size value: unknown >>>>>> name: Est Cardinality value: unknown >>>>>> costs: >>>>>> name: Network value: 0 >>>>>> name: Disk I/O value 0 >>>>>> name: CPU value: 0 >>>>>> name: Cumulative Network value: 0 >>>>>> name: Cumulative Disk I/O value: 0 >>>>>> name: Cumulative CPU value: 0 >>>>>> compiler_hints: >>>>>> name: Output Size (bytes) value: none >>>>>> name: Output Cardinality value: none >>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>> name: Filter Factor value: none >>>>>> >>>>>> id: 5 >>>>>> type: pact >>>>>> pact: FlatMap >>>>>> contents: FlatMap at main() >>>>>> parallelism: 10 >>>>>> predecessors: >>>>>> id: 6, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>> driver_strategy: FlatMap >>>>>> global_properties: >>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>> name: Partitioning Order value: none >>>>>> name: Uniqueness value: not unique >>>>>> local_properties: >>>>>> name: Order value: none >>>>>> name: Grouping value: not grouped >>>>>> name: Uniqueness value: not unique >>>>>> estimates: >>>>>> name: Est. Output Size value: unknown >>>>>> name: Est Cardinality value: unknown >>>>>> costs: >>>>>> name: Network value: 0 >>>>>> name: Disk I/O value 0 >>>>>> name: CPU value: 0 >>>>>> name: Cumulative Network value: 0 >>>>>> name: Cumulative Disk I/O value: 0 >>>>>> name: Cumulative CPU value: 0 >>>>>> compiler_hints: >>>>>> name: Output Size (bytes) value: none >>>>>> name: Output Cardinality value: none >>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>> name: Filter Factor value: none >>>>>> >>>>>> id: 4 >>>>>> type: pact >>>>>> pact: Sort-Partition >>>>>> contents: Sort at main() >>>>>> parallelism: 10 >>>>>> predecessors: >>>>>> id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: >>>>>> Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED >>>>>> driver_strategy: No-Op >>>>>> global_properties: >>>>>> name: partitioning v: HASH_PARTITIONED >>>>>> name: Partitioned on value: [0,2] >>>>>> name: Partitioning Order value: none >>>>>> name: Uniqueness value: not unique >>>>>> local_properties: >>>>>> name: Order value: [0:ASC,2:ASC,1:ASC] >>>>>> name: Grouping value: [0,2,1] >>>>>> name: Uniqueness value: not unique >>>>>> estimates: >>>>>> name: Est. Output Size value: unknown >>>>>> name: Est Cardinality value: unknown >>>>>> costs: >>>>>> name: Network value: 0 >>>>>> name: Disk I/O value 0 >>>>>> name: CPU value: 0 >>>>>> name: Cumulative Network value: unknown >>>>>> name: Cumulative Disk I/O value: unknown >>>>>> name: Cumulative CPU value: unknown >>>>>> compiler_hints: >>>>>> name: Output Size (bytes) value: none >>>>>> name: Output Cardinality value: none >>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>> name: Filter Factor value: none >>>>>> >>>>>> id: 3 >>>>>> type: pact >>>>>> pact: GroupReduce >>>>>> contents: GroupReduce at first(SortedGrouping.java:210) >>>>>> parallelism: 10 >>>>>> predecessors: >>>>>> id: 4, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>> driver_strategy: Sorted Group Reduce >>>>>> global_properties: >>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>> name: Partitioning Order value: none >>>>>> name: Uniqueness value: not unique >>>>>> local_properties: >>>>>> name: Order value: none >>>>>> name: Grouping value: not grouped >>>>>> name: Uniqueness value: not unique >>>>>> estimates: >>>>>> name: Est. Output Size value: unknown >>>>>> name: Est Cardinality value: unknown >>>>>> costs: >>>>>> name: Network value: 0 >>>>>> name: Disk I/O value 0 >>>>>> name: CPU value: 0 >>>>>> name: Cumulative Network value: unknown >>>>>> name: Cumulative Disk I/O value: unknown >>>>>> name: Cumulative CPU value: unknown >>>>>> compiler_hints: >>>>>> name: Output Size (bytes) value: none >>>>>> name: Output Cardinality value: none >>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>> name: Filter Factor value: none >>>>>> >>>>>> >>>>>> id: 2 >>>>>> type: pact >>>>>> pact: Map >>>>>> contents: Map at () >>>>>> parallelism: 10 >>>>>> predecessors: >>>>>> id: 3, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>> driver_strategy: Map >>>>>> global_properties: >>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>> name: Partitioning Order value: none >>>>>> name: Uniqueness value: not unique >>>>>> local_properties: >>>>>> name: Order value: none >>>>>> name: Grouping value: not grouped >>>>>> name: Uniqueness value: not unique >>>>>> estimates: >>>>>> name: Est. Output Size value: unknown >>>>>> name: Est Cardinality value: unknown >>>>>> costs: >>>>>> name: Network value: 0 >>>>>> name: Disk I/O value 0 >>>>>> name: CPU value: 0 >>>>>> name: Cumulative Network value: unknown >>>>>> name: Cumulative Disk I/O value: unknown >>>>>> name: Cumulative CPU value: unknown >>>>>> compiler_hints: >>>>>> name: Output Size (bytes) value: none >>>>>> name: Output Cardinality value: none >>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>> name: Filter Factor value: none >>>>>> >>>>>> id: 1 >>>>>> type: pact >>>>>> pact: Map >>>>>> contents: map at main() >>>>>> parallelism: 10 >>>>>> predecessors: >>>>>> id: 2, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>> driver_strategy: Map >>>>>> global_properties: >>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>> name: Partitioning Order value: none >>>>>> name: Uniqueness value: not unique >>>>>> local_properties: >>>>>> name: Order value: none >>>>>> name: Grouping value: not grouped >>>>>> name: Uniqueness value: not unique >>>>>> estimates: >>>>>> name: Est. Output Size value: unknown >>>>>> name: Est Cardinality value: unknown >>>>>> costs: >>>>>> name: Network value: 0 >>>>>> name: Disk I/O value 0 >>>>>> name: CPU value: 0 >>>>>> name: Cumulative Network value: unknown >>>>>> name: Cumulative Disk I/O value: unknown >>>>>> name: Cumulative CPU value: unknown >>>>>> compiler_hints: >>>>>> name: Output Size (bytes) value: none >>>>>> name: Output Cardinality value: none >>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>> name: Filter Factor value: none >>>>>> >>>>>> id: 0 >>>>>> type: sink >>>>>> pact: Data Sink >>>>>> contents: org.apache.flink.api.java.jado >>>>>> op.mapreduce.HadoopOutputFormat >>>>>> parallelism: 10 >>>>>> predecessors: >>>>>> id: 1, ship_strategy: Forward, exchange_mode: PIPELINED >>>>>> driver_strategy: Map >>>>>> global_properties: >>>>>> name: partitioning v: RANDOM_PARTITIONED >>>>>> name: Partitioning Order value: none >>>>>> name: Uniqueness value: not unique >>>>>> local_properties: >>>>>> name: Order value: none >>>>>> name: Grouping value: not grouped >>>>>> name: Uniqueness value: not unique >>>>>> estimates: >>>>>> name: Est. Output Size value: unknown >>>>>> name: Est Cardinality value: unknown >>>>>> costs: >>>>>> name: Network value: 0 >>>>>> name: Disk I/O value 0 >>>>>> name: CPU value: 0 >>>>>> name: Cumulative Network value: unknown >>>>>> name: Cumulative Disk I/O value: unknown >>>>>> name: Cumulative CPU value: unknown >>>>>> compiler_hints: >>>>>> name: Output Size (bytes) value: none >>>>>> name: Output Cardinality value: none >>>>>> name: Avg. Output Record Size (bytes) value: none >>>>>> name: Filter Factor value: none >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Flink's operators are designed to work in memory as long as possible >>>>>>> and spill to disk once the memory budget is exceeded. >>>>>>> Moreover, Flink aims to run programs in a pipelined fashion, such >>>>>>> that multiple operators can process data at the same time. >>>>>>> This behavior can make it a bit tricky to analyze the runtime >>>>>>> behavior and progress of operators. >>>>>>> >>>>>>> It would be interesting to have a look at the execution plan for the >>>>>>> program that you are running. >>>>>>> The plan can be obtained from the ExecutionEnvironment by calling >>>>>>> env.getExecutionPlan() instead of env.execute(). >>>>>>> >>>>>>> I would also like to know how you track the progress of the program. >>>>>>> Are you looking at the record counts displayed in the WebUI? >>>>>>> >>>>>>> Best, >>>>>>> Fabian >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com> >>>>>>> : >>>>>>> >>>>>>>> I have been moving some old MR and hive workflows into Flink >>>>>>>> because I'm enjoying the api's and the ease of development is >>>>>>>> wonderful. >>>>>>>> Things have largely worked great until I tried to really scale some of >>>>>>>> the >>>>>>>> jobs recently. >>>>>>>> >>>>>>>> I have for example one etl job that reads in about 12B records at a >>>>>>>> time and does a sort, some simple transformations, validation, a >>>>>>>> re-partition and then output to a hive table. >>>>>>>> When I built it with the sample set, ~200M, it worked great, took >>>>>>>> maybe a minute and blew threw it. >>>>>>>> >>>>>>>> What I have observed is there is some kind of saturation reached >>>>>>>> depending on number of slots, number of nodes and the overall size of >>>>>>>> data >>>>>>>> to move. When I run the 12B set, the first 1B go through in under 1 >>>>>>>> minute, really really fast. But its an extremely sharp drop off after >>>>>>>> that, the next 1B might take 15 minutes, and then if I wait for the >>>>>>>> next >>>>>>>> 1B, its well over an hour. >>>>>>>> >>>>>>>> What I cant find is any obvious indicators or things to look at, >>>>>>>> everything just grinds to a halt, I don't think the job would ever >>>>>>>> actually >>>>>>>> complete. >>>>>>>> >>>>>>>> Is there something in the design of flink in batch mode that is >>>>>>>> perhaps memory bound? Adding more nodes/tasks does not fix it, just >>>>>>>> gets >>>>>>>> me a little further along. I'm already running around ~1,400 slots at >>>>>>>> this >>>>>>>> point, I'd postulate needing 10,000+ to potentially make the job run, >>>>>>>> but >>>>>>>> thats too much of my cluster gone, and I have yet to get flink to be >>>>>>>> stable >>>>>>>> past 1,500. >>>>>>>> >>>>>>>> Any idea's on where to look, or what to debug? GUI is also very >>>>>>>> cumbersome to use at this slot count too, so other measurement ideas >>>>>>>> are >>>>>>>> welcome too! >>>>>>>> >>>>>>>> Thank you all. >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >