Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Running with these settings: yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: false taskmanager.memory.preallocate: true env.getConfig().setExecutionMode(ExecutionMode.BATCH)​ Loo

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Greg Hogan
Hi Garrett, In the Web UI, when viewing a job under overview / subtasks, selecting the checkbox "Aggregate task statistics by TaskManager” will reduce the number of displayed rows (though in your case only by half). The following documents profiling a Flink job with Java Flight Recorder: http

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Fabian Hueske
Ah, no direct memory buffer... Can you try to disable off-heap memory? 2017-12-07 18:35 GMT+01:00 Garrett Barton : > Stacktrace generates every time with the following settings (tried > different memory fractions): > yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 > akka.ask.timeout: 60s > containe

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Fabian Hueske
Oh, sorry. "Direct memory buffer" was the error message of the OOM. Direct memory buffers are used when off-heap memory is enabled. 2017-12-07 18:56 GMT+01:00 Garrett Barton : > Running with these settings: > yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 > akka.ask.timeout: 60s > containerized.he

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Stacktrace generates every time with the following settings (tried different memory fractions): yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7/0.3/0.1 taskmanager.memory.off-heap: true taskmanager.memory.pre

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Fabian Hueske
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace? The managed memory should be divided among all possible consumers. In case of your simple job, this should just be Sorter. In fact, I'd try to reduce the fraction to give more memory to the JVM heap (OOM means there was not enoug

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Garrett Barton
Thanks for the reply again, I'm currently doing runs with: yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 akka.ask.timeout: 60s containerized.heap-cutoff-ratio: 0.15 taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: true taskmanager.memory.preallocate: true When I change the config se

Re: Flink Batch Performance degradation at scale

2017-12-07 Thread Fabian Hueske
That doesn't look like a bad configuration. I have to correct myself regarding the size of the managed memory. The fraction (70%) is applied on the free memory after the TM initialization. This means that memory for network buffers (and other data structures) are subtracted before the managed memo

Re: Flink Batch Performance degradation at scale

2017-12-06 Thread Garrett Barton
Wow thank you for the reply, you gave me a lot to look into and mess with. I'll start testing with the various memory options and env settings tomorrow. BTW the current flink cluster is launched like: yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120 with flink-conf.yaml property overrides of: # so b

Re: Flink Batch Performance degradation at scale

2017-12-06 Thread Fabian Hueske
Hi Garrett, data skew might be a reason for the performance degradation. The plan you shared is pretty simple. The following happens you run the program: - The data source starts to read data and pushes the records to the FlatMapFunction. From there the records are shuffed (using hash-partitionin

Re: Flink Batch Performance degradation at scale

2017-12-06 Thread Garrett Barton
Fabian, Thank you for the reply. Yes I do watch via the ui, is there another way to see progress through the steps? I think I just figured it out, the hangup is in the sort phase (ID 4) where 2 slots take all the time. Looking in the UI most slots get less than 500MB of data to sort, these two

Re: Flink Batch Performance degradation at scale

2017-12-05 Thread Fabian Hueske
Hi, Flink's operators are designed to work in memory as long as possible and spill to disk once the memory budget is exceeded. Moreover, Flink aims to run programs in a pipelined fashion, such that multiple operators can process data at the same time. This behavior can make it a bit tricky to anal

Flink Batch Performance degradation at scale

2017-12-05 Thread Garrett Barton
I have been moving some old MR and hive workflows into Flink because I'm enjoying the api's and the ease of development is wonderful. Things have largely worked great until I tried to really scale some of the jobs recently. I have for example one etl job that reads in about 12B records at a time