Thanks for the reply again,

 I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

When I change the config setExecutionMode() to BATCH, no matter what memory
fraction I choose the sort instantly fails with SortMerger OOM exceptions.
Even when I set fraction to 0.95.  The data source part is ridiculously
fast though, ~30 seconds!  Disabling batch mode and keeping the other
changes looks like to do the same behavior as before, jobs been running for
~20 minutes now.  Does Batch mode disable spilling to disk, or does batch
with a combo of off heap disable spilling to disk?  Is there more
documentation on what Batch mode does under the covers?

As for the flow itself, yes it used to be a lot smaller, I broke it out
manually by adding the sort/partition to see which steps were causing me
the slowdown, thinking it was my code, I wanted to separate the operations.

Thank you again for your help.

On Thu, Dec 7, 2017 at 4:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> That doesn't look like a bad configuration.
>
> I have to correct myself regarding the size of the managed memory. The
> fraction (70%) is applied on the free memory after the TM initialization.
> This means that memory for network buffers (and other data structures) are
> subtracted before the managed memory is allocated.
> The actual size of the managed memory is logged in the TM log file during
> start up.
>
> You could also try to decrease the number of slots per TM to 1 but add
> more vCores (yarn.containers.vcores []) because the sorter runs in
> multiple threads.
>
> Adding a GroupCombineFunction for pre-aggregation (if possible...) would
> help to mitigate the effects of the data skew.
> Another thing I'd like to ask: Are you adding the partitioner and sorter
> explicitly to the plan and if so why? Usually, the partitioning and sorting
> is done as part of the GroupReduce.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/config.html#yarn
>
> 2017-12-06 23:32 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Wow thank you for the reply, you gave me a lot to look into and mess
>> with. I'll start testing with the various memory options and env settings
>> tomorrow.
>>
>> BTW the current flink cluster is launched like:
>> yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
>>
>> with flink-conf.yaml property overrides of:
>> # so bigger clusters don't fail to init
>> akka.ask.timeout: 60s
>> # so more memory is given to the JVM from the yarn container
>> containerized.heap-cutoff-ratio: 0.15
>>
>> So each flink slot doesn't necessarily get a lot of ram, you said 70% of
>> ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So
>> each slot is sitting with ~2737MB of usable space.  Would you have a
>> different config for taking overall the same amount of ram?
>>
>>
>>
>>
>> On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Garrett,
>>>
>>> data skew might be a reason for the performance degradation.
>>>
>>> The plan you shared is pretty simple. The following happens you run the
>>> program:
>>> - The data source starts to read data and pushes the records to the
>>> FlatMapFunction. From there the records are shuffed (using
>>> hash-partitioning) to the sorter.
>>> - The sorter tasks consume the records and write them into a memory
>>> buffer. When the buffer is full, it is sorted and spilled to disk. When the
>>> buffer was spilled, it is filled again with records, sorted, and spilled.
>>> - The initially fast processing happens because at the beginning the
>>> sorter is not waiting for buffers to be sorted or spilled because they are
>>> empty.
>>>
>>> The performance of the plan depends (among other things) on the size of
>>> the sort buffers. The sort buffers are taken from Flink's managed memory.
>>> Unless you configured something else, 70% of to the TaskManager heap
>>> memory is reserved as managed memory.
>>> If you use Flink only for batch jobs, I would enable preallocation and
>>> off-heap memory (see configuration options [1]). You can also configure a
>>> fixed size for the managed memory. The more memory you configure, the more
>>> is available for sorting.
>>>
>>> The managed memory of a TM is evenly distributed to all its processing
>>> slots. Hence, having more slots per TM means that each slot has fewer
>>> managed memory (for sorting or joins or ...).
>>> So many slots are not necessarily good for performance (unless you
>>> increase the number of TMs / memory as well), especially in case of data
>>> skew when most slots receive only little data and cannot leverage their
>>> memory.
>>> If your data is heavily skewed, it might make sense to have fewer slots
>>> such that each slot has more memory for sorting.
>>>
>>> Skew has also an effect on downstream operations. In case of skew, some
>>> of the sorter tasks are overloaded and cannot accept more data.
>>> Due to the pipelined shuffles, this leads to a back pressure behavior
>>> that propagates down to the sources.
>>> You can disable pipelining by setting the execution mode on the
>>> execution configuration to BATCH [2]. This will break the pipeline but
>>> write the result of the FlatMap to disk.
>>> This might help, if the FlatMap is compute intensive or filters many
>>> records.
>>>
>>> The data sizes don't sound particular large, so this should be something
>>> that Flink should be able to handle.
>>>
>>> Btw. you don't need to convert the JSON plan output. You can paste it
>>> into the plan visualizer [3].
>>> I would not worry about the missing statistics. The optimizer does not
>>> leverage them at the current state.
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> setup/config.html#managed-memory
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/execution_configuration.html
>>> [3] http://flink.apache.org/visualizer/
>>>
>>> 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>
>>>> Fabian,
>>>>
>>>>  Thank you for the reply.  Yes I do watch via the ui, is there another
>>>> way to see progress through the steps?
>>>>
>>>> I think I just figured it out, the hangup is in the sort phase (ID 4)
>>>> where 2 slots take all the time.  Looking in the UI most slots get less
>>>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together
>>>> its about 272M records and these will run for hours at this point.  Looks
>>>> like I need to figure out a different partitioning/sort strategy. I never
>>>> noticed before because when I run the system at ~1400 slots I don't use the
>>>> UI anymore as its gets unresponsive.  400 Slots is painfully slow, but
>>>> still works.
>>>>
>>>>
>>>> The getEnv output is very cool! Also very big, I've tried to summarize
>>>> it here in more of a yaml format as its on a different network.  Note the
>>>> parallelism was just set to 10 as I didn't know if that effected output.
>>>> Hopefully I didn't flub a copy paste step, it looks good to me.
>>>>
>>>>
>>>> ​This flow used to be far fewer steps, but as it wasn't scaling I broke
>>>> it out into all the distinct pieces so I could see where it failed.​
>>>> Source and sink are both Hive tables.  I wonder if the inputformat is
>>>> expected to give more info to seed some of these stat values?
>>>>
>>>> ​nodes
>>>>     id: 6
>>>>     type: source
>>>>     pact: Data Source
>>>>     contents: at CreateInput(ExecutionEnvironment.java:533)
>>>>     parallelism: 10
>>>>     global_properties:
>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>         name: Partitioning Order value: none
>>>>         name: Uniqueness value: not unique
>>>>     local_properties:
>>>>         name: Order value: none
>>>>         name: Grouping value: not grouped
>>>>         name: Uniqueness value: not unique
>>>>     estimates:
>>>>         name: Est. Output Size value: unknown
>>>>         name: Est Cardinality value: unknown
>>>>     costs:
>>>>         name: Network value: 0
>>>>         name: Disk I/O value 0
>>>>         name: CPU value: 0
>>>>         name: Cumulative Network value: 0
>>>>         name: Cumulative Disk I/O value: 0
>>>>         name: Cumulative CPU value: 0
>>>>     compiler_hints:
>>>>         name: Output Size (bytes) value: none
>>>>         name: Output Cardinality value: none
>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>         name: Filter Factor value: none
>>>>
>>>>     id: 5
>>>>     type: pact
>>>>     pact: FlatMap
>>>>     contents: FlatMap at main()
>>>>     parallelism: 10
>>>>     predecessors:
>>>>         id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>     driver_strategy: FlatMap
>>>>     global_properties:
>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>         name: Partitioning Order value: none
>>>>         name: Uniqueness value: not unique
>>>>     local_properties:
>>>>         name: Order value: none
>>>>         name: Grouping value: not grouped
>>>>         name: Uniqueness value: not unique
>>>>     estimates:
>>>>         name: Est. Output Size value: unknown
>>>>         name: Est Cardinality value: unknown
>>>>     costs:
>>>>         name: Network value: 0
>>>>         name: Disk I/O value 0
>>>>         name: CPU value: 0
>>>>         name: Cumulative Network value: 0
>>>>         name: Cumulative Disk I/O value: 0
>>>>         name: Cumulative CPU value: 0
>>>>     compiler_hints:
>>>>         name: Output Size (bytes) value: none
>>>>         name: Output Cardinality value: none
>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>         name: Filter Factor value: none
>>>>
>>>>     id: 4
>>>>     type: pact
>>>>     pact: Sort-Partition
>>>>     contents: Sort at main()
>>>>     parallelism: 10
>>>>     predecessors:
>>>>         id: 5, ship_strategy: Hash Partition on [0,2] local_strategy:
>>>> Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
>>>>     driver_strategy: No-Op
>>>>     global_properties:
>>>>         name: partitioning v: HASH_PARTITIONED
>>>>         name: Partitioned on value: [0,2]
>>>>         name: Partitioning Order value: none
>>>>         name: Uniqueness value: not unique
>>>>     local_properties:
>>>>         name: Order value: [0:ASC,2:ASC,1:ASC]
>>>>         name: Grouping value: [0,2,1]
>>>>         name: Uniqueness value: not unique
>>>>     estimates:
>>>>         name: Est. Output Size value: unknown
>>>>         name: Est Cardinality value: unknown
>>>>     costs:
>>>>         name: Network value: 0
>>>>         name: Disk I/O value 0
>>>>         name: CPU value: 0
>>>>         name: Cumulative Network value: unknown
>>>>         name: Cumulative Disk I/O value: unknown
>>>>         name: Cumulative CPU value: unknown
>>>>     compiler_hints:
>>>>         name: Output Size (bytes) value: none
>>>>         name: Output Cardinality value: none
>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>         name: Filter Factor value: none
>>>>
>>>>     id: 3
>>>>     type: pact
>>>>     pact: GroupReduce
>>>>     contents: GroupReduce at first(SortedGrouping.java:210)
>>>>     parallelism: 10
>>>>     predecessors:
>>>>         id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>     driver_strategy: Sorted Group Reduce
>>>>     global_properties:
>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>         name: Partitioning Order value: none
>>>>         name: Uniqueness value: not unique
>>>>     local_properties:
>>>>         name: Order value: none
>>>>         name: Grouping value: not grouped
>>>>         name: Uniqueness value: not unique
>>>>     estimates:
>>>>         name: Est. Output Size value: unknown
>>>>         name: Est Cardinality value: unknown
>>>>     costs:
>>>>         name: Network value: 0
>>>>         name: Disk I/O value 0
>>>>         name: CPU value: 0
>>>>         name: Cumulative Network value: unknown
>>>>         name: Cumulative Disk I/O value: unknown
>>>>         name: Cumulative CPU value: unknown
>>>>     compiler_hints:
>>>>         name: Output Size (bytes) value: none
>>>>         name: Output Cardinality value: none
>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>         name: Filter Factor value: none
>>>>
>>>>
>>>>     id: 2
>>>>     type: pact
>>>>     pact: Map
>>>>     contents: Map at ()
>>>>     parallelism: 10
>>>>     predecessors:
>>>>         id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>     driver_strategy: Map
>>>>     global_properties:
>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>         name: Partitioning Order value: none
>>>>         name: Uniqueness value: not unique
>>>>     local_properties:
>>>>         name: Order value: none
>>>>         name: Grouping value: not grouped
>>>>         name: Uniqueness value: not unique
>>>>     estimates:
>>>>         name: Est. Output Size value: unknown
>>>>         name: Est Cardinality value: unknown
>>>>     costs:
>>>>         name: Network value: 0
>>>>         name: Disk I/O value 0
>>>>         name: CPU value: 0
>>>>         name: Cumulative Network value: unknown
>>>>         name: Cumulative Disk I/O value: unknown
>>>>         name: Cumulative CPU value: unknown
>>>>     compiler_hints:
>>>>         name: Output Size (bytes) value: none
>>>>         name: Output Cardinality value: none
>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>         name: Filter Factor value: none
>>>>
>>>>     id: 1
>>>>     type: pact
>>>>     pact: Map
>>>>     contents: map at main()
>>>>     parallelism: 10
>>>>     predecessors:
>>>>         id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>     driver_strategy: Map
>>>>     global_properties:
>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>         name: Partitioning Order value: none
>>>>         name: Uniqueness value: not unique
>>>>     local_properties:
>>>>         name: Order value: none
>>>>         name: Grouping value: not grouped
>>>>         name: Uniqueness value: not unique
>>>>     estimates:
>>>>         name: Est. Output Size value: unknown
>>>>         name: Est Cardinality value: unknown
>>>>     costs:
>>>>         name: Network value: 0
>>>>         name: Disk I/O value 0
>>>>         name: CPU value: 0
>>>>         name: Cumulative Network value: unknown
>>>>         name: Cumulative Disk I/O value: unknown
>>>>         name: Cumulative CPU value: unknown
>>>>     compiler_hints:
>>>>         name: Output Size (bytes) value: none
>>>>         name: Output Cardinality value: none
>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>         name: Filter Factor value: none
>>>>
>>>>     id: 0
>>>>     type: sink
>>>>     pact: Data Sink
>>>>     contents: org.apache.flink.api.java.jado
>>>> op.mapreduce.HadoopOutputFormat
>>>>     parallelism: 10
>>>>     predecessors:
>>>>         id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
>>>>     driver_strategy: Map
>>>>     global_properties:
>>>>         name: partitioning v: RANDOM_PARTITIONED
>>>>         name: Partitioning Order value: none
>>>>         name: Uniqueness value: not unique
>>>>     local_properties:
>>>>         name: Order value: none
>>>>         name: Grouping value: not grouped
>>>>         name: Uniqueness value: not unique
>>>>     estimates:
>>>>         name: Est. Output Size value: unknown
>>>>         name: Est Cardinality value: unknown
>>>>     costs:
>>>>         name: Network value: 0
>>>>         name: Disk I/O value 0
>>>>         name: CPU value: 0
>>>>         name: Cumulative Network value: unknown
>>>>         name: Cumulative Disk I/O value: unknown
>>>>         name: Cumulative CPU value: unknown
>>>>     compiler_hints:
>>>>         name: Output Size (bytes) value: none
>>>>         name: Output Cardinality value: none
>>>>         name: Avg. Output Record Size (bytes) value: none
>>>>         name: Filter Factor value: none​
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink's operators are designed to work in memory as long as possible
>>>>> and spill to disk once the memory budget is exceeded.
>>>>> Moreover, Flink aims to run programs in a pipelined fashion, such that
>>>>> multiple operators can process data at the same time.
>>>>> This behavior can make it a bit tricky to analyze the runtime behavior
>>>>> and progress of operators.
>>>>>
>>>>> It would be interesting to have a look at the execution plan for the
>>>>> program that you are running.
>>>>> The plan can be obtained from the ExecutionEnvironment by calling
>>>>> env.getExecutionPlan() instead of env.execute().
>>>>>
>>>>> I would also like to know how you track the progress of the program.
>>>>> Are you looking at the record counts displayed in the WebUI?
>>>>>
>>>>> Best,
>>>>> Fabian
>>>>>
>>>>>
>>>>>
>>>>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>>
>>>>>> I have been moving some old MR and hive workflows into Flink because
>>>>>> I'm enjoying the api's and the ease of development is wonderful.  Things
>>>>>> have largely worked great until I tried to really scale some of the jobs
>>>>>> recently.
>>>>>>
>>>>>> I have for example one etl job that reads in about 12B records at a
>>>>>> time and does a sort, some simple transformations, validation, a
>>>>>> re-partition and then output to a hive table.
>>>>>> When I built it with the sample set, ~200M, it worked great, took
>>>>>> maybe a minute and blew threw it.
>>>>>>
>>>>>> What I have observed is there is some kind of saturation reached
>>>>>> depending on number of slots, number of nodes and the overall size of 
>>>>>> data
>>>>>> to move.  When I run the 12B set, the first 1B go through in under 1
>>>>>> minute, really really fast.  But its an extremely sharp drop off after
>>>>>> that, the next 1B might take 15 minutes, and then if I wait for the next
>>>>>> 1B, its well over an hour.
>>>>>>
>>>>>> What I cant find is any obvious indicators or things to look at,
>>>>>> everything just grinds to a halt, I don't think the job would ever 
>>>>>> actually
>>>>>> complete.
>>>>>>
>>>>>> Is there something in the design of flink in batch mode that is
>>>>>> perhaps memory bound?  Adding more nodes/tasks does not fix it, just gets
>>>>>> me a little further along.  I'm already running around ~1,400 slots at 
>>>>>> this
>>>>>> point, I'd postulate needing 10,000+ to potentially make the job run, but
>>>>>> thats too much of my cluster gone, and I have yet to get flink to be 
>>>>>> stable
>>>>>> past 1,500.
>>>>>>
>>>>>> Any idea's on where to look, or what to debug?  GUI is also very
>>>>>> cumbersome to use at this slot count too, so other measurement ideas are
>>>>>> welcome too!
>>>>>>
>>>>>> Thank you all.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to