Hi Violeta,
can you share your connector code with us? The plan looks quite
complicated given the relatively simple query. Maybe there is some
optimization potential. But before we dive deeper, I see a `Map(to:
Row)` which indicates that we might work with a legacy sink connector.
Did you try to run the same pipeline with only an outer join (without
aggregation) or only aggregation (without the outer join)? It would be
interesting to find the bottleneck. It might make sense to increase
Flink's managed memory which is used for sorting. Also the JVM heap size
looks pretty tiny.
Regards,
Timo
On 08.09.20 08:30, Violeta Milanović wrote:
Hi all, I'm currently testing PyFlink against PySpark for batch jobs.
The query I'm using is:/select
max(c.first_name),
max(c.last_name),
avg(transaction_amount) as avg_ta,
avg(salary+bonus) as avg_income,
avg(salary+bonus) - avg(transaction_amount) as spending
from transactions t left join customers c on t.customer_id = c.customer_id
group by c.customer_id
/It's really simple one, and I have 1 million data in customers and 5
million in transactions. My output table should contains around 690k
rows. But I'm facing an issue, because when I want to sink into CSV 690k
rows, it's too slow comparing to PySpark. It's not problem when sink
table is one row, or even 100k rows, executions are way better than
PySpark. PySpark execution time is 50 seconds for this query and sink,
and PyFlink is around 120 seconds. I'm running it both in docker
container with default settings, and I increased parallelism for PyFlink
to 6, and this is execution with 6 cpus. I'm new at Flink's memory, and
as I understood to increase taskmanager.memory.process.size and
jobmanager.memory.process.size, which I tried, but it didn't help.
My configuration are default, created from official docker image:
jobmanager.heap.size 1024m
taskmanager.memory.size 1568m
cpu cores:6
physical memory 11.7
jvm head size: 512m
flink managed memory: 512m
This is how my execution plan looks:
1.png
Thanks!
Violeta