Wow thank you for the reply, you gave me a lot to look into and mess with.
I'll start testing with the various memory options and env settings
tomorrow.

BTW the current flink cluster is launched like:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120

with flink-conf.yaml property overrides of:
# so bigger clusters don't fail to init
akka.ask.timeout: 60s
# so more memory is given to the JVM from the yarn container
containerized.heap-cutoff-ratio: 0.15

So each flink slot doesn't necessarily get a lot of ram, you said 70% of
ram goes to the job by default, so that's (9200*0.85)*0.70 = 5474MB.  So
each slot is sitting with ~2737MB of usable space.  Would you have a
different config for taking overall the same amount of ram?




On Wed, Dec 6, 2017 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Garrett,
>
> data skew might be a reason for the performance degradation.
>
> The plan you shared is pretty simple. The following happens you run the
> program:
> - The data source starts to read data and pushes the records to the
> FlatMapFunction. From there the records are shuffed (using
> hash-partitioning) to the sorter.
> - The sorter tasks consume the records and write them into a memory
> buffer. When the buffer is full, it is sorted and spilled to disk. When the
> buffer was spilled, it is filled again with records, sorted, and spilled.
> - The initially fast processing happens because at the beginning the
> sorter is not waiting for buffers to be sorted or spilled because they are
> empty.
>
> The performance of the plan depends (among other things) on the size of
> the sort buffers. The sort buffers are taken from Flink's managed memory.
> Unless you configured something else, 70% of to the TaskManager heap
> memory is reserved as managed memory.
> If you use Flink only for batch jobs, I would enable preallocation and
> off-heap memory (see configuration options [1]). You can also configure a
> fixed size for the managed memory. The more memory you configure, the more
> is available for sorting.
>
> The managed memory of a TM is evenly distributed to all its processing
> slots. Hence, having more slots per TM means that each slot has fewer
> managed memory (for sorting or joins or ...).
> So many slots are not necessarily good for performance (unless you
> increase the number of TMs / memory as well), especially in case of data
> skew when most slots receive only little data and cannot leverage their
> memory.
> If your data is heavily skewed, it might make sense to have fewer slots
> such that each slot has more memory for sorting.
>
> Skew has also an effect on downstream operations. In case of skew, some of
> the sorter tasks are overloaded and cannot accept more data.
> Due to the pipelined shuffles, this leads to a back pressure behavior that
> propagates down to the sources.
> You can disable pipelining by setting the execution mode on the execution
> configuration to BATCH [2]. This will break the pipeline but write the
> result of the FlatMap to disk.
> This might help, if the FlatMap is compute intensive or filters many
> records.
>
> The data sizes don't sound particular large, so this should be something
> that Flink should be able to handle.
>
> Btw. you don't need to convert the JSON plan output. You can paste it into
> the plan visualizer [3].
> I would not worry about the missing statistics. The optimizer does not
> leverage them at the current state.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/config.html#managed-memory
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/execution_configuration.html
> [3] http://flink.apache.org/visualizer/
>
> 2017-12-06 16:45 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>
>> Fabian,
>>
>>  Thank you for the reply.  Yes I do watch via the ui, is there another
>> way to see progress through the steps?
>>
>> I think I just figured it out, the hangup is in the sort phase (ID 4)
>> where 2 slots take all the time.  Looking in the UI most slots get less
>> than 500MB of data to sort, these two have 6.7GB and 7.3GB each, together
>> its about 272M records and these will run for hours at this point.  Looks
>> like I need to figure out a different partitioning/sort strategy. I never
>> noticed before because when I run the system at ~1400 slots I don't use the
>> UI anymore as its gets unresponsive.  400 Slots is painfully slow, but
>> still works.
>>
>>
>> The getEnv output is very cool! Also very big, I've tried to summarize it
>> here in more of a yaml format as its on a different network.  Note the
>> parallelism was just set to 10 as I didn't know if that effected output.
>> Hopefully I didn't flub a copy paste step, it looks good to me.
>>
>>
>> ​This flow used to be far fewer steps, but as it wasn't scaling I broke
>> it out into all the distinct pieces so I could see where it failed.​
>> Source and sink are both Hive tables.  I wonder if the inputformat is
>> expected to give more info to seed some of these stat values?
>>
>> ​nodes
>>     id: 6
>>     type: source
>>     pact: Data Source
>>     contents: at CreateInput(ExecutionEnvironment.java:533)
>>     parallelism: 10
>>     global_properties:
>>         name: partitioning v: RANDOM_PARTITIONED
>>         name: Partitioning Order value: none
>>         name: Uniqueness value: not unique
>>     local_properties:
>>         name: Order value: none
>>         name: Grouping value: not grouped
>>         name: Uniqueness value: not unique
>>     estimates:
>>         name: Est. Output Size value: unknown
>>         name: Est Cardinality value: unknown
>>     costs:
>>         name: Network value: 0
>>         name: Disk I/O value 0
>>         name: CPU value: 0
>>         name: Cumulative Network value: 0
>>         name: Cumulative Disk I/O value: 0
>>         name: Cumulative CPU value: 0
>>     compiler_hints:
>>         name: Output Size (bytes) value: none
>>         name: Output Cardinality value: none
>>         name: Avg. Output Record Size (bytes) value: none
>>         name: Filter Factor value: none
>>
>>     id: 5
>>     type: pact
>>     pact: FlatMap
>>     contents: FlatMap at main()
>>     parallelism: 10
>>     predecessors:
>>         id: 6, ship_strategy: Forward, exchange_mode: PIPELINED
>>     driver_strategy: FlatMap
>>     global_properties:
>>         name: partitioning v: RANDOM_PARTITIONED
>>         name: Partitioning Order value: none
>>         name: Uniqueness value: not unique
>>     local_properties:
>>         name: Order value: none
>>         name: Grouping value: not grouped
>>         name: Uniqueness value: not unique
>>     estimates:
>>         name: Est. Output Size value: unknown
>>         name: Est Cardinality value: unknown
>>     costs:
>>         name: Network value: 0
>>         name: Disk I/O value 0
>>         name: CPU value: 0
>>         name: Cumulative Network value: 0
>>         name: Cumulative Disk I/O value: 0
>>         name: Cumulative CPU value: 0
>>     compiler_hints:
>>         name: Output Size (bytes) value: none
>>         name: Output Cardinality value: none
>>         name: Avg. Output Record Size (bytes) value: none
>>         name: Filter Factor value: none
>>
>>     id: 4
>>     type: pact
>>     pact: Sort-Partition
>>     contents: Sort at main()
>>     parallelism: 10
>>     predecessors:
>>         id: 5, ship_strategy: Hash Partition on [0,2] local_strategy:
>> Sort on [0:ASC,2:ASC,1:ASC], exchange_mode: PIPELINED
>>     driver_strategy: No-Op
>>     global_properties:
>>         name: partitioning v: HASH_PARTITIONED
>>         name: Partitioned on value: [0,2]
>>         name: Partitioning Order value: none
>>         name: Uniqueness value: not unique
>>     local_properties:
>>         name: Order value: [0:ASC,2:ASC,1:ASC]
>>         name: Grouping value: [0,2,1]
>>         name: Uniqueness value: not unique
>>     estimates:
>>         name: Est. Output Size value: unknown
>>         name: Est Cardinality value: unknown
>>     costs:
>>         name: Network value: 0
>>         name: Disk I/O value 0
>>         name: CPU value: 0
>>         name: Cumulative Network value: unknown
>>         name: Cumulative Disk I/O value: unknown
>>         name: Cumulative CPU value: unknown
>>     compiler_hints:
>>         name: Output Size (bytes) value: none
>>         name: Output Cardinality value: none
>>         name: Avg. Output Record Size (bytes) value: none
>>         name: Filter Factor value: none
>>
>>     id: 3
>>     type: pact
>>     pact: GroupReduce
>>     contents: GroupReduce at first(SortedGrouping.java:210)
>>     parallelism: 10
>>     predecessors:
>>         id: 4, ship_strategy: Forward, exchange_mode: PIPELINED
>>     driver_strategy: Sorted Group Reduce
>>     global_properties:
>>         name: partitioning v: RANDOM_PARTITIONED
>>         name: Partitioning Order value: none
>>         name: Uniqueness value: not unique
>>     local_properties:
>>         name: Order value: none
>>         name: Grouping value: not grouped
>>         name: Uniqueness value: not unique
>>     estimates:
>>         name: Est. Output Size value: unknown
>>         name: Est Cardinality value: unknown
>>     costs:
>>         name: Network value: 0
>>         name: Disk I/O value 0
>>         name: CPU value: 0
>>         name: Cumulative Network value: unknown
>>         name: Cumulative Disk I/O value: unknown
>>         name: Cumulative CPU value: unknown
>>     compiler_hints:
>>         name: Output Size (bytes) value: none
>>         name: Output Cardinality value: none
>>         name: Avg. Output Record Size (bytes) value: none
>>         name: Filter Factor value: none
>>
>>
>>     id: 2
>>     type: pact
>>     pact: Map
>>     contents: Map at ()
>>     parallelism: 10
>>     predecessors:
>>         id: 3, ship_strategy: Forward, exchange_mode: PIPELINED
>>     driver_strategy: Map
>>     global_properties:
>>         name: partitioning v: RANDOM_PARTITIONED
>>         name: Partitioning Order value: none
>>         name: Uniqueness value: not unique
>>     local_properties:
>>         name: Order value: none
>>         name: Grouping value: not grouped
>>         name: Uniqueness value: not unique
>>     estimates:
>>         name: Est. Output Size value: unknown
>>         name: Est Cardinality value: unknown
>>     costs:
>>         name: Network value: 0
>>         name: Disk I/O value 0
>>         name: CPU value: 0
>>         name: Cumulative Network value: unknown
>>         name: Cumulative Disk I/O value: unknown
>>         name: Cumulative CPU value: unknown
>>     compiler_hints:
>>         name: Output Size (bytes) value: none
>>         name: Output Cardinality value: none
>>         name: Avg. Output Record Size (bytes) value: none
>>         name: Filter Factor value: none
>>
>>     id: 1
>>     type: pact
>>     pact: Map
>>     contents: map at main()
>>     parallelism: 10
>>     predecessors:
>>         id: 2, ship_strategy: Forward, exchange_mode: PIPELINED
>>     driver_strategy: Map
>>     global_properties:
>>         name: partitioning v: RANDOM_PARTITIONED
>>         name: Partitioning Order value: none
>>         name: Uniqueness value: not unique
>>     local_properties:
>>         name: Order value: none
>>         name: Grouping value: not grouped
>>         name: Uniqueness value: not unique
>>     estimates:
>>         name: Est. Output Size value: unknown
>>         name: Est Cardinality value: unknown
>>     costs:
>>         name: Network value: 0
>>         name: Disk I/O value 0
>>         name: CPU value: 0
>>         name: Cumulative Network value: unknown
>>         name: Cumulative Disk I/O value: unknown
>>         name: Cumulative CPU value: unknown
>>     compiler_hints:
>>         name: Output Size (bytes) value: none
>>         name: Output Cardinality value: none
>>         name: Avg. Output Record Size (bytes) value: none
>>         name: Filter Factor value: none
>>
>>     id: 0
>>     type: sink
>>     pact: Data Sink
>>     contents: org.apache.flink.api.java.jado
>> op.mapreduce.HadoopOutputFormat
>>     parallelism: 10
>>     predecessors:
>>         id: 1, ship_strategy: Forward, exchange_mode: PIPELINED
>>     driver_strategy: Map
>>     global_properties:
>>         name: partitioning v: RANDOM_PARTITIONED
>>         name: Partitioning Order value: none
>>         name: Uniqueness value: not unique
>>     local_properties:
>>         name: Order value: none
>>         name: Grouping value: not grouped
>>         name: Uniqueness value: not unique
>>     estimates:
>>         name: Est. Output Size value: unknown
>>         name: Est Cardinality value: unknown
>>     costs:
>>         name: Network value: 0
>>         name: Disk I/O value 0
>>         name: CPU value: 0
>>         name: Cumulative Network value: unknown
>>         name: Cumulative Disk I/O value: unknown
>>         name: Cumulative CPU value: unknown
>>     compiler_hints:
>>         name: Output Size (bytes) value: none
>>         name: Output Cardinality value: none
>>         name: Avg. Output Record Size (bytes) value: none
>>         name: Filter Factor value: none​
>>
>>
>>
>>
>> On Tue, Dec 5, 2017 at 5:36 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Flink's operators are designed to work in memory as long as possible and
>>> spill to disk once the memory budget is exceeded.
>>> Moreover, Flink aims to run programs in a pipelined fashion, such that
>>> multiple operators can process data at the same time.
>>> This behavior can make it a bit tricky to analyze the runtime behavior
>>> and progress of operators.
>>>
>>> It would be interesting to have a look at the execution plan for the
>>> program that you are running.
>>> The plan can be obtained from the ExecutionEnvironment by calling
>>> env.getExecutionPlan() instead of env.execute().
>>>
>>> I would also like to know how you track the progress of the program.
>>> Are you looking at the record counts displayed in the WebUI?
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>>
>>> 2017-12-05 22:03 GMT+01:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>
>>>> I have been moving some old MR and hive workflows into Flink because
>>>> I'm enjoying the api's and the ease of development is wonderful.  Things
>>>> have largely worked great until I tried to really scale some of the jobs
>>>> recently.
>>>>
>>>> I have for example one etl job that reads in about 12B records at a
>>>> time and does a sort, some simple transformations, validation, a
>>>> re-partition and then output to a hive table.
>>>> When I built it with the sample set, ~200M, it worked great, took maybe
>>>> a minute and blew threw it.
>>>>
>>>> What I have observed is there is some kind of saturation reached
>>>> depending on number of slots, number of nodes and the overall size of data
>>>> to move.  When I run the 12B set, the first 1B go through in under 1
>>>> minute, really really fast.  But its an extremely sharp drop off after
>>>> that, the next 1B might take 15 minutes, and then if I wait for the next
>>>> 1B, its well over an hour.
>>>>
>>>> What I cant find is any obvious indicators or things to look at,
>>>> everything just grinds to a halt, I don't think the job would ever actually
>>>> complete.
>>>>
>>>> Is there something in the design of flink in batch mode that is perhaps
>>>> memory bound?  Adding more nodes/tasks does not fix it, just gets me a
>>>> little further along.  I'm already running around ~1,400 slots at this
>>>> point, I'd postulate needing 10,000+ to potentially make the job run, but
>>>> thats too much of my cluster gone, and I have yet to get flink to be stable
>>>> past 1,500.
>>>>
>>>> Any idea's on where to look, or what to debug?  GUI is also very
>>>> cumbersome to use at this slot count too, so other measurement ideas are
>>>> welcome too!
>>>>
>>>> Thank you all.
>>>>
>>>
>>>
>>
>

Reply via email to