Hi Ken, at the moment, there are just two parameters to control the parallelism of Flink operators generated by the Cascading-Flink connector.
The parameters are: - flink.num.sourceTasks to specify the parallelism of source tasks. - flink.num.shuffleTasks to specify the parallelism of all shuffling tasks (GroupBy, CoGroup). Non-shuffling operators such as Each/Map and HashJoin take the parallelism of their predecessor (for HashJoin the first input) to avoid random shuffling. So an Each/Map or Join that immediately follows a source runs with the source parallelism. Effectively, most operators will run with the shuffle parallelism, because Each and HashJoin pick it up once their input was shuffled. It is currently not possible to specify the parallelism of an individual task. However, I am open for suggestions if you have a good idea to improve the situation. I think we should continue the discussion on the Cascading-Flink Github project since this is feature would not require changes in Flink but only in the Cascading Flink runner. Best, Fabian 2016-04-27 6:25 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>: > Hi all, > > I’m busy tuning up a workflow (defined w/Cascading, planned with Flink) > that runs on a 5 slave EMR cluster. > > The default parallelism (from the Flink planner) is set to 40, since I’ve > got 5 task managers (one per node) and 8 slots/TM. > > But this seems to jam things up, as I see simultaneous GroupReduce > subtasks competing for resources (or so it seems). > > Any insight into how to tune this? > > And what’s the right way to set it on a sub-task basis? With Cascading > Flows planned for M-R I can set the number of reducers via a Hadoop JobConf > configuration setting, on a per-step (to use Cascading lingo) basis. But > with a Flow planned for Flink, there’s only a single “step”. > > Thanks, > > — Ken > >