Hi Sunny, this is a current limitation of Flink's scheduling. We are currently working on extending Flinks scheduling mechanism [1] which should also help with solving this problem. At the moment, I recommend using the per-job mode so that you have a single cluster per job.
[1] https://issues.apache.org/jira/browse/FLINK-10407 Cheers, Till On Wed, Dec 5, 2018 at 2:07 AM Sunny Yun <seonhee....@gmail.com> wrote: > Why does Flink do resource management by only slots, not by TaskManagers > and slots? > > If there are one Flink cluster to submit multiple jobs, how do I make > JobManager to distribute subtasks evenly to all TaskManagers? > Now, JobManager treats the slots globally, some jobs' operators are > assigned only one TM's slots. > > > For example: > > 3 TaskManager (taskmanager.numberOfTaskSlots: 8) = total 24 slots > > env > .setParallelism(6) > .addSource(sourceFunction) > .partitionCustom(partitioner, keySelector) > .map(mapper) > .addSink(sinkFunction); > env.execute(job1); > > env > .setParallelism(12) > .addSource(sourceFunction) > .partitionCustom(partitioner, keySelector) > .map(mapper) > .addSink(sinkFunction); > env.execute(job2); > > env > .setParallelism(6) > .addSource(sourceFunction) > .partitionCustom(partitioner, keySelector) > .map(mapper) > .addSink(sinkFunction); > env.execute(job3); > > > Intented : > TM1 TM2 TM3 > -------------- > job1-source 2 2 2 > job1-map-sink 2 2 2 > job2-source 4 4 4 > job2-map-sink 4 4 4 > job3-source 2 2 2 > job3-map-sink 2 2 2 > > > Because each job is under the stress at unpredictable time, it is > important to use all available resource per each job. > We made three clusters (6, 6, 12 each total slots) as a temporary, but > it's not pretty way. > > > Best, Sunny > ᐧ >