Hi, folks!

I am running Flink Streaming job in mode=Batch on EMR.

The job has following stages:
1.      Read from MySQL
2.      KeyBy user_id
3.      Reduce by user_id
4.      Async I/O enriching from Redis
5.      Async I/O enriching from other Redis
6.      Async I/O enriching from REST #1
7.      Async I/O enriching from REST #2
8.      Async I/O enriching from REST #2
9.      Write to Elasticsearch



As you can see it is very network-intensive job, it has a lot of Async 
operators that we currently cannot get rid of. As it is not a general use-case 
I wanted to ask for advice: what cluster configuration specifics we should take 
into account if we need to run such kind of a job as efficiently as possible?

I have already tried several configurations:

#1

master: r6g.xlarge
core: r6g.xlarge (CPU: 4; RAM: 32 GiB; Disk: EBS 128 GB, network: 1.25 Gigabit 
baseline with burst up to 10 Gigabit)
problems: works with sort-based 
shuffling<https://flink.apache.org/2021/10/26/sort-shuffle-part1.html> enabled 
but very slowly (~36h), as this type of instance has a baseline & burst 
performance, when burst credits are exhausted degrades to the baseline of 
1GBps, that slows down I/O. With hash-based shuffling fails on KeyBy -> Reduce 
with "Connection reset by peer", Task Manager fails -> Job fails -> Job manager 
is not able to restart.

#2

master: m5.xlarge
core: r6g.12xlarge (CPU: 48; RAM: 384 GiB; Disk: EBS 1.5 TB, network: 20 
Gigabit)
problems: job fails. With sort-based shuffling fails on the writing phase with 
exception "Failed to transfer file from TaskExecutor". With hash-based 
shuffling fails on the same stage with "Connection reset by peer".

#3

master: m5.xlarge
core: c6gn.4xlarge (CPU: 16; RAM: 32 GiB; Disk: EBS 128 GB, network: 25 Gigabit)
resume: this configuration actually works fine, but it is a bit expensive, so I 
would like to find some cheaper solution.

I would appreciate any help!

Kind Regards,
Valeriia

Reply via email to