Thanks for the reply again, I'm currently doing runs with: yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: true taskmanager.memory.preallocate: true
When I change the config setExecutionMode() to BATCH, no matter what memory fraction I choose the sort instantly fails with SortMerger OOM exceptions. Even when I set fraction to 0.95. The data source part is ridiculously fast though, ~30 seconds! Disabling batch mode and keeping the other changes looks like to do the same behavior as before, jobs been running for ~20 minutes now. Does Batch mode disable spilling to disk, or does batch with a combo of off heap disable spilling to disk? Is there more documentation on what Batch mode does under the covers? As for the flow itself, yes it used to be a lot smaller, I broke it out manually by adding the sort/partition to see which steps were causing me the slowdown, thinking it was my code, I wanted to separate the operations. Thank you again for your help. On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhue...@gmail.com> wrote: > That doesn't look like a bad configuration. > > I have to correct myself regarding the size of the managed memory. The > fraction (70%) is applied on the free memory after the TM initialization. > This means that memory for network buffers (and other data structures) are > subtracted before the managed memory is allocated. > The actual size of the managed memory is logged in the TM log file during > start up. > > You could also try to decrease the number of slots per TM to 1 but add > more vCores (yarn.containers.vcores []) because the sorter runs in > multiple threads. > > Adding a GroupCombineFunction for pre-aggregation (if possible...) would > help to mitigate the effects of the data skew. > Another thing I'd like to ask: Are you adding the partitioner and sorter > explicitly to the plan and if so why? Usually, the partitioning and sorting > is done as part of the GroupReduce. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/setup/config.html#yarn > > 2017-12-06 23:32 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: > >> Wow thank you for the reply, you gave me a lot to look into and mess >> with. I'll start testing with the various memory options and env settings >> tomorrow. >> >> BTW the current flink cluster is launched like: >> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 >> >> with flink-conf.yaml property overrides of: >> # so bigger clusters don't fail to init >> akka.ask.timeout: 60s >> # so more memory is given to the JVM from the yarn container >> containerized.heap-cutoff-ratio: 0.15 >> >> So each flink slot doesn't necessarily get a lot of ram, you said 70% of >> ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB. So >> each slot is sitting with ~2737MB of usable space. Would you have a >> different config for taking overall the same amount of ram? >> >> >> >> >> On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Garrett, >>> >>> data skew might be a reason for the performance degradation. >>> >>> The plan you shared is pretty simple. The following happens you run the >>> program: >>> - The data source starts to read data and pushes the records to the >>> FlatMapFunction. From there the records are shuffed (using >>> hash-partitioning) to the sorter. >>> - The sorter tasks consume the records and write them into a memory >>> buffer. When the buffer is full, it is sorted and spilled to disk. When the >>> buffer was spilled, it is filled again with records, sorted, and spilled. >>> - The initially fast processing happens because at the beginning the >>> sorter is not waiting for buffers to be sorted or spilled because they are >>> empty. >>> >>> The performance of the plan depends (among other things) on the size of >>> the sort buffers. The sort buffers are taken from Flink's managed memory. >>> Unless you configured something else, 70% of to the TaskManager heap >>> memory is reserved as managed memory. >>> If you use Flink only for batch jobs, I would enable preallocation and >>> off-heap memory (see configuration options [1]). You can also configure a >>> fixed size for the managed memory. The more memory you configure, the more >>> is available for sorting. >>> >>> The managed memory of a TM is evenly distributed to all its processing >>> slots. Hence, having more slots per TM means that each slot has fewer >>> managed memory (for sorting or joins or ...). >>> So many slots are not necessarily good for performance (unless you >>> increase the number of TMs / memory as well), especially in case of data >>> skew when most slots receive only little data and cannot leverage their >>> memory. >>> If your data is heavily skewed, it might make sense to have fewer slots >>> such that each slot has more memory for sorting. >>> >>> Skew has also an effect on downstream operations. In case of skew, some >>> of the sorter tasks are overloaded and cannot accept more data. >>> Due to the pipelined shuffles, this leads to a back pressure behavior >>> that propagates down to the sources. >>> You can disable pipelining by setting the execution mode on the >>> execution configuration to BATCH [2]. This will break the pipeline but >>> write the result of the FlatMap to disk. >>> This might help, if the FlatMap is compute intensive or filters many >>> records. >>> >>> The data sizes don't sound particular large, so this should be something >>> that Flink should be able to handle. >>> >>> Btw. you don't need to convert the JSON plan output. You can paste it >>> into the plan visualizer [3]. >>> I would not worry about the missing statistics. The optimizer does not >>> leverage them at the current state. >>> >>> Best, Fabian >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> setup/config.html#managed-memory >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/execution_configuration.html >>> [3] http://flink.apache.org/visualizer/ >>> >>> 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: >>> >>>> Fabian, >>>> >>>> Thank you for the reply. Yes I do watch via the ui, is there another >>>> way to see progress through the steps? >>>> >>>> I think I just figured it out, the hangup is in the sort phase (ID 4) >>>> where 2 slots take all the time. Looking in the UI most slots get less >>>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together >>>> its about 272M records and these will run for hours at this point. Looks >>>> like I need to figure out a different partitioning/sort strategy. I never >>>> noticed before because when I run the system at ~1400 slots I don't use the >>>> UI anymore as its gets unresponsive. 400 Slots is painfully slow, but >>>> still works. >>>> >>>> >>>> The getEnv output is very cool! Also very big, I've tried to summarize >>>> it here in more of a yaml format as its on a different network. Note the >>>> parallelism was just set to 10 as I didn't know if that effected output. >>>> Hopefully I didn't flub a copy paste step, it looks good to me. >>>> >>>> >>>> This flow used to be far fewer steps, but as it wasn't scaling I broke >>>> it out into all the distinct pieces so I could see where it failed. >>>> Source and sink are both Hive tables. I wonder if the inputformat is >>>> expected to give more info to seed some of these stat values? >>>> >>>> nodes >>>> id: 6 >>>> type: source >>>> pact: Data Source >>>> contents: at CreateInput(ExecutionEnvironment.java:533) >>>> parallelism: 10 >>>> global_properties: >>>> name: partitioning v: RANDOM_PARTITIONED >>>> name: Partitioning Order value: none >>>> name: Uniqueness value: not unique >>>> local_properties: >>>> name: Order value: none >>>> name: Grouping value: not grouped >>>> name: Uniqueness value: not unique >>>> estimates: >>>> name: Est. Output Size value: unknown >>>> name: Est Cardinality value: unknown >>>> costs: >>>> name: Network value: 0 >>>> name: Disk I/O value 0 >>>> name: CPU value: 0 >>>> name: Cumulative Network value: 0 >>>> name: Cumulative Disk I/O value: 0 >>>> name: Cumulative CPU value: 0 >>>> compiler_hints: >>>> name: Output Size (bytes) value: none >>>> name: Output Cardinality value: none >>>> name: Avg. Output Record Size (bytes) value: none >>>> name: Filter Factor value: none >>>> >>>> id: 5 >>>> type: pact >>>> pact: FlatMap >>>> contents: FlatMap at main() >>>> parallelism: 10 >>>> predecessors: >>>> id: 6, ship_strategy: Forward, exchange_mode: PIPELINED >>>> driver_strategy: FlatMap >>>> global_properties: >>>> name: partitioning v: RANDOM_PARTITIONED >>>> name: Partitioning Order value: none >>>> name: Uniqueness value: not unique >>>> local_properties: >>>> name: Order value: none >>>> name: Grouping value: not grouped >>>> name: Uniqueness value: not unique >>>> estimates: >>>> name: Est. Output Size value: unknown >>>> name: Est Cardinality value: unknown >>>> costs: >>>> name: Network value: 0 >>>> name: Disk I/O value 0 >>>> name: CPU value: 0 >>>> name: Cumulative Network value: 0 >>>> name: Cumulative Disk I/O value: 0 >>>> name: Cumulative CPU value: 0 >>>> compiler_hints: >>>> name: Output Size (bytes) value: none >>>> name: Output Cardinality value: none >>>> name: Avg. Output Record Size (bytes) value: none >>>> name: Filter Factor value: none >>>> >>>> id: 4 >>>> type: pact >>>> pact: Sort-Partition >>>> contents: Sort at main() >>>> parallelism: 10 >>>> predecessors: >>>> id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: >>>> Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED >>>> driver_strategy: No-Op >>>> global_properties: >>>> name: partitioning v: HASH_PARTITIONED >>>> name: Partitioned on value: [0,2] >>>> name: Partitioning Order value: none >>>> name: Uniqueness value: not unique >>>> local_properties: >>>> name: Order value: [0:ASC,2:ASC,1:ASC] >>>> name: Grouping value: [0,2,1] >>>> name: Uniqueness value: not unique >>>> estimates: >>>> name: Est. Output Size value: unknown >>>> name: Est Cardinality value: unknown >>>> costs: >>>> name: Network value: 0 >>>> name: Disk I/O value 0 >>>> name: CPU value: 0 >>>> name: Cumulative Network value: unknown >>>> name: Cumulative Disk I/O value: unknown >>>> name: Cumulative CPU value: unknown >>>> compiler_hints: >>>> name: Output Size (bytes) value: none >>>> name: Output Cardinality value: none >>>> name: Avg. Output Record Size (bytes) value: none >>>> name: Filter Factor value: none >>>> >>>> id: 3 >>>> type: pact >>>> pact: GroupReduce >>>> contents: GroupReduce at first(SortedGrouping.java:210) >>>> parallelism: 10 >>>> predecessors: >>>> id: 4, ship_strategy: Forward, exchange_mode: PIPELINED >>>> driver_strategy: Sorted Group Reduce >>>> global_properties: >>>> name: partitioning v: RANDOM_PARTITIONED >>>> name: Partitioning Order value: none >>>> name: Uniqueness value: not unique >>>> local_properties: >>>> name: Order value: none >>>> name: Grouping value: not grouped >>>> name: Uniqueness value: not unique >>>> estimates: >>>> name: Est. Output Size value: unknown >>>> name: Est Cardinality value: unknown >>>> costs: >>>> name: Network value: 0 >>>> name: Disk I/O value 0 >>>> name: CPU value: 0 >>>> name: Cumulative Network value: unknown >>>> name: Cumulative Disk I/O value: unknown >>>> name: Cumulative CPU value: unknown >>>> compiler_hints: >>>> name: Output Size (bytes) value: none >>>> name: Output Cardinality value: none >>>> name: Avg. Output Record Size (bytes) value: none >>>> name: Filter Factor value: none >>>> >>>> >>>> id: 2 >>>> type: pact >>>> pact: Map >>>> contents: Map at () >>>> parallelism: 10 >>>> predecessors: >>>> id: 3, ship_strategy: Forward, exchange_mode: PIPELINED >>>> driver_strategy: Map >>>> global_properties: >>>> name: partitioning v: RANDOM_PARTITIONED >>>> name: Partitioning Order value: none >>>> name: Uniqueness value: not unique >>>> local_properties: >>>> name: Order value: none >>>> name: Grouping value: not grouped >>>> name: Uniqueness value: not unique >>>> estimates: >>>> name: Est. Output Size value: unknown >>>> name: Est Cardinality value: unknown >>>> costs: >>>> name: Network value: 0 >>>> name: Disk I/O value 0 >>>> name: CPU value: 0 >>>> name: Cumulative Network value: unknown >>>> name: Cumulative Disk I/O value: unknown >>>> name: Cumulative CPU value: unknown >>>> compiler_hints: >>>> name: Output Size (bytes) value: none >>>> name: Output Cardinality value: none >>>> name: Avg. Output Record Size (bytes) value: none >>>> name: Filter Factor value: none >>>> >>>> id: 1 >>>> type: pact >>>> pact: Map >>>> contents: map at main() >>>> parallelism: 10 >>>> predecessors: >>>> id: 2, ship_strategy: Forward, exchange_mode: PIPELINED >>>> driver_strategy: Map >>>> global_properties: >>>> name: partitioning v: RANDOM_PARTITIONED >>>> name: Partitioning Order value: none >>>> name: Uniqueness value: not unique >>>> local_properties: >>>> name: Order value: none >>>> name: Grouping value: not grouped >>>> name: Uniqueness value: not unique >>>> estimates: >>>> name: Est. Output Size value: unknown >>>> name: Est Cardinality value: unknown >>>> costs: >>>> name: Network value: 0 >>>> name: Disk I/O value 0 >>>> name: CPU value: 0 >>>> name: Cumulative Network value: unknown >>>> name: Cumulative Disk I/O value: unknown >>>> name: Cumulative CPU value: unknown >>>> compiler_hints: >>>> name: Output Size (bytes) value: none >>>> name: Output Cardinality value: none >>>> name: Avg. Output Record Size (bytes) value: none >>>> name: Filter Factor value: none >>>> >>>> id: 0 >>>> type: sink >>>> pact: Data Sink >>>> contents: org.apache.flink.api.java.jado >>>> op.mapreduce.HadoopOutputFormat >>>> parallelism: 10 >>>> predecessors: >>>> id: 1, ship_strategy: Forward, exchange_mode: PIPELINED >>>> driver_strategy: Map >>>> global_properties: >>>> name: partitioning v: RANDOM_PARTITIONED >>>> name: Partitioning Order value: none >>>> name: Uniqueness value: not unique >>>> local_properties: >>>> name: Order value: none >>>> name: Grouping value: not grouped >>>> name: Uniqueness value: not unique >>>> estimates: >>>> name: Est. Output Size value: unknown >>>> name: Est Cardinality value: unknown >>>> costs: >>>> name: Network value: 0 >>>> name: Disk I/O value 0 >>>> name: CPU value: 0 >>>> name: Cumulative Network value: unknown >>>> name: Cumulative Disk I/O value: unknown >>>> name: Cumulative CPU value: unknown >>>> compiler_hints: >>>> name: Output Size (bytes) value: none >>>> name: Output Cardinality value: none >>>> name: Avg. Output Record Size (bytes) value: none >>>> name: Filter Factor value: none >>>> >>>> >>>> >>>> >>>> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Flink's operators are designed to work in memory as long as possible >>>>> and spill to disk once the memory budget is exceeded. >>>>> Moreover, Flink aims to run programs in a pipelined fashion, such that >>>>> multiple operators can process data at the same time. >>>>> This behavior can make it a bit tricky to analyze the runtime behavior >>>>> and progress of operators. >>>>> >>>>> It would be interesting to have a look at the execution plan for the >>>>> program that you are running. >>>>> The plan can be obtained from the ExecutionEnvironment by calling >>>>> env.getExecutionPlan() instead of env.execute(). >>>>> >>>>> I would also like to know how you track the progress of the program. >>>>> Are you looking at the record counts displayed in the WebUI? >>>>> >>>>> Best, >>>>> Fabian >>>>> >>>>> >>>>> >>>>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: >>>>> >>>>>> I have been moving some old MR and hive workflows into Flink because >>>>>> I'm enjoying the api's and the ease of development is wonderful. Things >>>>>> have largely worked great until I tried to really scale some of the jobs >>>>>> recently. >>>>>> >>>>>> I have for example one etl job that reads in about 12B records at a >>>>>> time and does a sort, some simple transformations, validation, a >>>>>> re-partition and then output to a hive table. >>>>>> When I built it with the sample set, ~200M, it worked great, took >>>>>> maybe a minute and blew threw it. >>>>>> >>>>>> What I have observed is there is some kind of saturation reached >>>>>> depending on number of slots, number of nodes and the overall size of >>>>>> data >>>>>> to move. When I run the 12B set, the first 1B go through in under 1 >>>>>> minute, really really fast. But its an extremely sharp drop off after >>>>>> that, the next 1B might take 15 minutes, and then if I wait for the next >>>>>> 1B, its well over an hour. >>>>>> >>>>>> What I cant find is any obvious indicators or things to look at, >>>>>> everything just grinds to a halt, I don't think the job would ever >>>>>> actually >>>>>> complete. >>>>>> >>>>>> Is there something in the design of flink in batch mode that is >>>>>> perhaps memory bound? Adding more nodes/tasks does not fix it, just gets >>>>>> me a little further along. I'm already running around ~1,400 slots at >>>>>> this >>>>>> point, I'd postulate needing 10,000+ to potentially make the job run, but >>>>>> thats too much of my cluster gone, and I have yet to get flink to be >>>>>> stable >>>>>> past 1,500. >>>>>> >>>>>> Any idea's on where to look, or what to debug? GUI is also very >>>>>> cumbersome to use at this slot count too, so other measurement ideas are >>>>>> welcome too! >>>>>> >>>>>> Thank you all. >>>>>> >>>>> >>>>> >>>> >>> >> >