Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow.
BTW the current flink cluster is launched like: yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 with flink-conf.yaml property overrides of: # so bigger clusters don't fail to init akka.ask.timeout: 60s # so more memory is given to the JVM from the yarn container containerized.heap-cutoff-ratio: 0.15 So each flink slot doesn't necessarily get a lot of ram, you said 70% of ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB. So each slot is sitting with ~2737MB of usable space. Would you have a different config for taking overall the same amount of ram? On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Garrett, > > data skew might be a reason for the performance degradation. > > The plan you shared is pretty simple. The following happens you run the > program: > - The data source starts to read data and pushes the records to the > FlatMapFunction. From there the records are shuffed (using > hash-partitioning) to the sorter. > - The sorter tasks consume the records and write them into a memory > buffer. When the buffer is full, it is sorted and spilled to disk. When the > buffer was spilled, it is filled again with records, sorted, and spilled. > - The initially fast processing happens because at the beginning the > sorter is not waiting for buffers to be sorted or spilled because they are > empty. > > The performance of the plan depends (among other things) on the size of > the sort buffers. The sort buffers are taken from Flink's managed memory. > Unless you configured something else, 70% of to the TaskManager heap > memory is reserved as managed memory. > If you use Flink only for batch jobs, I would enable preallocation and > off-heap memory (see configuration options [1]). You can also configure a > fixed size for the managed memory. The more memory you configure, the more > is available for sorting. > > The managed memory of a TM is evenly distributed to all its processing > slots. Hence, having more slots per TM means that each slot has fewer > managed memory (for sorting or joins or ...). > So many slots are not necessarily good for performance (unless you > increase the number of TMs / memory as well), especially in case of data > skew when most slots receive only little data and cannot leverage their > memory. > If your data is heavily skewed, it might make sense to have fewer slots > such that each slot has more memory for sorting. > > Skew has also an effect on downstream operations. In case of skew, some of > the sorter tasks are overloaded and cannot accept more data. > Due to the pipelined shuffles, this leads to a back pressure behavior that > propagates down to the sources. > You can disable pipelining by setting the execution mode on the execution > configuration to BATCH [2]. This will break the pipeline but write the > result of the FlatMap to disk. > This might help, if the FlatMap is compute intensive or filters many > records. > > The data sizes don't sound particular large, so this should be something > that Flink should be able to handle. > > Btw. you don't need to convert the JSON plan output. You can paste it into > the plan visualizer [3]. > I would not worry about the missing statistics. The optimizer does not > leverage them at the current state. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/setup/config.html#managed-memory > [2] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/execution_configuration.html > [3] http://flink.apache.org/visualizer/ > > 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: > >> Fabian, >> >> Thank you for the reply. Yes I do watch via the ui, is there another >> way to see progress through the steps? >> >> I think I just figured it out, the hangup is in the sort phase (ID 4) >> where 2 slots take all the time. Looking in the UI most slots get less >> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together >> its about 272M records and these will run for hours at this point. Looks >> like I need to figure out a different partitioning/sort strategy. I never >> noticed before because when I run the system at ~1400 slots I don't use the >> UI anymore as its gets unresponsive. 400 Slots is painfully slow, but >> still works. >> >> >> The getEnv output is very cool! Also very big, I've tried to summarize it >> here in more of a yaml format as its on a different network. Note the >> parallelism was just set to 10 as I didn't know if that effected output. >> Hopefully I didn't flub a copy paste step, it looks good to me. >> >> >> This flow used to be far fewer steps, but as it wasn't scaling I broke >> it out into all the distinct pieces so I could see where it failed. >> Source and sink are both Hive tables. I wonder if the inputformat is >> expected to give more info to seed some of these stat values? >> >> nodes >> id: 6 >> type: source >> pact: Data Source >> contents: at CreateInput(ExecutionEnvironment.java:533) >> parallelism: 10 >> global_properties: >> name: partitioning v: RANDOM_PARTITIONED >> name: Partitioning Order value: none >> name: Uniqueness value: not unique >> local_properties: >> name: Order value: none >> name: Grouping value: not grouped >> name: Uniqueness value: not unique >> estimates: >> name: Est. Output Size value: unknown >> name: Est Cardinality value: unknown >> costs: >> name: Network value: 0 >> name: Disk I/O value 0 >> name: CPU value: 0 >> name: Cumulative Network value: 0 >> name: Cumulative Disk I/O value: 0 >> name: Cumulative CPU value: 0 >> compiler_hints: >> name: Output Size (bytes) value: none >> name: Output Cardinality value: none >> name: Avg. Output Record Size (bytes) value: none >> name: Filter Factor value: none >> >> id: 5 >> type: pact >> pact: FlatMap >> contents: FlatMap at main() >> parallelism: 10 >> predecessors: >> id: 6, ship_strategy: Forward, exchange_mode: PIPELINED >> driver_strategy: FlatMap >> global_properties: >> name: partitioning v: RANDOM_PARTITIONED >> name: Partitioning Order value: none >> name: Uniqueness value: not unique >> local_properties: >> name: Order value: none >> name: Grouping value: not grouped >> name: Uniqueness value: not unique >> estimates: >> name: Est. Output Size value: unknown >> name: Est Cardinality value: unknown >> costs: >> name: Network value: 0 >> name: Disk I/O value 0 >> name: CPU value: 0 >> name: Cumulative Network value: 0 >> name: Cumulative Disk I/O value: 0 >> name: Cumulative CPU value: 0 >> compiler_hints: >> name: Output Size (bytes) value: none >> name: Output Cardinality value: none >> name: Avg. Output Record Size (bytes) value: none >> name: Filter Factor value: none >> >> id: 4 >> type: pact >> pact: Sort-Partition >> contents: Sort at main() >> parallelism: 10 >> predecessors: >> id: 5, ship_strategy: Hash Partition on [0,2] local_strategy: >> Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED >> driver_strategy: No-Op >> global_properties: >> name: partitioning v: HASH_PARTITIONED >> name: Partitioned on value: [0,2] >> name: Partitioning Order value: none >> name: Uniqueness value: not unique >> local_properties: >> name: Order value: [0:ASC,2:ASC,1:ASC] >> name: Grouping value: [0,2,1] >> name: Uniqueness value: not unique >> estimates: >> name: Est. Output Size value: unknown >> name: Est Cardinality value: unknown >> costs: >> name: Network value: 0 >> name: Disk I/O value 0 >> name: CPU value: 0 >> name: Cumulative Network value: unknown >> name: Cumulative Disk I/O value: unknown >> name: Cumulative CPU value: unknown >> compiler_hints: >> name: Output Size (bytes) value: none >> name: Output Cardinality value: none >> name: Avg. Output Record Size (bytes) value: none >> name: Filter Factor value: none >> >> id: 3 >> type: pact >> pact: GroupReduce >> contents: GroupReduce at first(SortedGrouping.java:210) >> parallelism: 10 >> predecessors: >> id: 4, ship_strategy: Forward, exchange_mode: PIPELINED >> driver_strategy: Sorted Group Reduce >> global_properties: >> name: partitioning v: RANDOM_PARTITIONED >> name: Partitioning Order value: none >> name: Uniqueness value: not unique >> local_properties: >> name: Order value: none >> name: Grouping value: not grouped >> name: Uniqueness value: not unique >> estimates: >> name: Est. Output Size value: unknown >> name: Est Cardinality value: unknown >> costs: >> name: Network value: 0 >> name: Disk I/O value 0 >> name: CPU value: 0 >> name: Cumulative Network value: unknown >> name: Cumulative Disk I/O value: unknown >> name: Cumulative CPU value: unknown >> compiler_hints: >> name: Output Size (bytes) value: none >> name: Output Cardinality value: none >> name: Avg. Output Record Size (bytes) value: none >> name: Filter Factor value: none >> >> >> id: 2 >> type: pact >> pact: Map >> contents: Map at () >> parallelism: 10 >> predecessors: >> id: 3, ship_strategy: Forward, exchange_mode: PIPELINED >> driver_strategy: Map >> global_properties: >> name: partitioning v: RANDOM_PARTITIONED >> name: Partitioning Order value: none >> name: Uniqueness value: not unique >> local_properties: >> name: Order value: none >> name: Grouping value: not grouped >> name: Uniqueness value: not unique >> estimates: >> name: Est. Output Size value: unknown >> name: Est Cardinality value: unknown >> costs: >> name: Network value: 0 >> name: Disk I/O value 0 >> name: CPU value: 0 >> name: Cumulative Network value: unknown >> name: Cumulative Disk I/O value: unknown >> name: Cumulative CPU value: unknown >> compiler_hints: >> name: Output Size (bytes) value: none >> name: Output Cardinality value: none >> name: Avg. Output Record Size (bytes) value: none >> name: Filter Factor value: none >> >> id: 1 >> type: pact >> pact: Map >> contents: map at main() >> parallelism: 10 >> predecessors: >> id: 2, ship_strategy: Forward, exchange_mode: PIPELINED >> driver_strategy: Map >> global_properties: >> name: partitioning v: RANDOM_PARTITIONED >> name: Partitioning Order value: none >> name: Uniqueness value: not unique >> local_properties: >> name: Order value: none >> name: Grouping value: not grouped >> name: Uniqueness value: not unique >> estimates: >> name: Est. Output Size value: unknown >> name: Est Cardinality value: unknown >> costs: >> name: Network value: 0 >> name: Disk I/O value 0 >> name: CPU value: 0 >> name: Cumulative Network value: unknown >> name: Cumulative Disk I/O value: unknown >> name: Cumulative CPU value: unknown >> compiler_hints: >> name: Output Size (bytes) value: none >> name: Output Cardinality value: none >> name: Avg. Output Record Size (bytes) value: none >> name: Filter Factor value: none >> >> id: 0 >> type: sink >> pact: Data Sink >> contents: org.apache.flink.api.java.jado >> op.mapreduce.HadoopOutputFormat >> parallelism: 10 >> predecessors: >> id: 1, ship_strategy: Forward, exchange_mode: PIPELINED >> driver_strategy: Map >> global_properties: >> name: partitioning v: RANDOM_PARTITIONED >> name: Partitioning Order value: none >> name: Uniqueness value: not unique >> local_properties: >> name: Order value: none >> name: Grouping value: not grouped >> name: Uniqueness value: not unique >> estimates: >> name: Est. Output Size value: unknown >> name: Est Cardinality value: unknown >> costs: >> name: Network value: 0 >> name: Disk I/O value 0 >> name: CPU value: 0 >> name: Cumulative Network value: unknown >> name: Cumulative Disk I/O value: unknown >> name: Cumulative CPU value: unknown >> compiler_hints: >> name: Output Size (bytes) value: none >> name: Output Cardinality value: none >> name: Avg. Output Record Size (bytes) value: none >> name: Filter Factor value: none >> >> >> >> >> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> Flink's operators are designed to work in memory as long as possible and >>> spill to disk once the memory budget is exceeded. >>> Moreover, Flink aims to run programs in a pipelined fashion, such that >>> multiple operators can process data at the same time. >>> This behavior can make it a bit tricky to analyze the runtime behavior >>> and progress of operators. >>> >>> It would be interesting to have a look at the execution plan for the >>> program that you are running. >>> The plan can be obtained from the ExecutionEnvironment by calling >>> env.getExecutionPlan() instead of env.execute(). >>> >>> I would also like to know how you track the progress of the program. >>> Are you looking at the record counts displayed in the WebUI? >>> >>> Best, >>> Fabian >>> >>> >>> >>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>: >>> >>>> I have been moving some old MR and hive workflows into Flink because >>>> I'm enjoying the api's and the ease of development is wonderful. Things >>>> have largely worked great until I tried to really scale some of the jobs >>>> recently. >>>> >>>> I have for example one etl job that reads in about 12B records at a >>>> time and does a sort, some simple transformations, validation, a >>>> re-partition and then output to a hive table. >>>> When I built it with the sample set, ~200M, it worked great, took maybe >>>> a minute and blew threw it. >>>> >>>> What I have observed is there is some kind of saturation reached >>>> depending on number of slots, number of nodes and the overall size of data >>>> to move. When I run the 12B set, the first 1B go through in under 1 >>>> minute, really really fast. But its an extremely sharp drop off after >>>> that, the next 1B might take 15 minutes, and then if I wait for the next >>>> 1B, its well over an hour. >>>> >>>> What I cant find is any obvious indicators or things to look at, >>>> everything just grinds to a halt, I don't think the job would ever actually >>>> complete. >>>> >>>> Is there something in the design of flink in batch mode that is perhaps >>>> memory bound? Adding more nodes/tasks does not fix it, just gets me a >>>> little further along. I'm already running around ~1,400 slots at this >>>> point, I'd postulate needing 10,000+ to potentially make the job run, but >>>> thats too much of my cluster gone, and I have yet to get flink to be stable >>>> past 1,500. >>>> >>>> Any idea's on where to look, or what to debug? GUI is also very >>>> cumbersome to use at this slot count too, so other measurement ideas are >>>> welcome too! >>>> >>>> Thank you all. >>>> >>> >>> >> >