Hi all,
I must say I'm very impressed by Flink and what it can do.
I was trying to play around with Flink operator parallelism and scalability
and I have few questions regarding this subject.
My setup is:
1. Flink 1.9.1
2. Docker Job Cluster, where each Task manager has only one task slot. I'm
following [1]
3. env setup:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.setMaxParallelism(128);
env.enableCheckpointing(10 * 60 * 1000);
Please mind that I am using operator chaining here.
My pipeline setup:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
As you can see I have 7 operators (few of them were actually chained and
this is ok), with different parallelism level. This all gives me 23 tasks
total.
I've noticed that with "one task manager = one task slot" approach I have to
have 6 task slots/task managers to be able to start this pipeline.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
If number of task slots is lower than 6, job is scheduled but not started.
With 6 task slots everything is working fine and I've must say that I'm very
impressed with a way that Flinks balanced data between task slots. Data was
distributed very evenly between operator instances/tasks.
In this setup (7 operators, 23 tasks and 6 task slots), some task slots have
to be reused by more than one operator. While inspecting UI I've found
examples such operators. This is what I was expecting though.
However I was surprised a little bit after I added one additional task
manager (hence one new task slot)
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
After adding new resources, Flink did not re balanced/redistributed the
graph. So this host was sitting there and doing nothing. Even after putting
some load on the cluster, still this node was not used.
*After doing this exercise I have few questions:*
1. It seems that number of task slots must be equal or greater than max
number of parallelism used in the pipeline. In my case it was 6. When I
changed parallelism for one of the operator to 7, I had to have 7 task slots
(task managers in my setup) to be able to even start the job.
Is this the case?
2. What I can do to use the extra node that was spanned while job was
running?
In other words, If I would see that one of my nodes has to much load what I
can do? Please mind that I'm using keyBy/hashing function in my pipeline and
in my tests I had around 5000 unique keys.
I've try to use REST API to call "rescale" but I got this response:
/302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/
Thanks.
[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/