Hi all, I must say I'm very impressed by Flink and what it can do. I was trying to play around with Flink operator parallelism and scalability and I have few questions regarding this subject.
My setup is: 1. Flink 1.9.1 2. Docker Job Cluster, where each Task manager has only one task slot. I'm following [1] 3. env setup: env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)); env.setParallelism(1); env.setMaxParallelism(128); env.enableCheckpointing(10 * 60 * 1000); Please mind that I am using operator chaining here. My pipeline setup: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png> As you can see I have 7 operators (few of them were actually chained and this is ok), with different parallelism level. This all gives me 23 tasks total. I've noticed that with "one task manager = one task slot" approach I have to have 6 task slots/task managers to be able to start this pipeline. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png> If number of task slots is lower than 6, job is scheduled but not started. With 6 task slots everything is working fine and I've must say that I'm very impressed with a way that Flinks balanced data between task slots. Data was distributed very evenly between operator instances/tasks. In this setup (7 operators, 23 tasks and 6 task slots), some task slots have to be reused by more than one operator. While inspecting UI I've found examples such operators. This is what I was expecting though. However I was surprised a little bit after I added one additional task manager (hence one new task slot) <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png> After adding new resources, Flink did not re balanced/redistributed the graph. So this host was sitting there and doing nothing. Even after putting some load on the cluster, still this node was not used. *After doing this exercise I have few questions:* 1. It seems that number of task slots must be equal or greater than max number of parallelism used in the pipeline. In my case it was 6. When I changed parallelism for one of the operator to 7, I had to have 7 task slots (task managers in my setup) to be able to even start the job. Is this the case? 2. What I can do to use the extra node that was spanned while job was running? In other words, If I would see that one of my nodes has to much load what I can do? Please mind that I'm using keyBy/hashing function in my pipeline and in my tests I had around 5000 unique keys. I've try to use REST API to call "rescale" but I got this response: /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/ Thanks. [1] https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/