Hi Violeta,

can you share your connector code with us? The plan looks quite complicated given the relatively simple query. Maybe there is some optimization potential. But before we dive deeper, I see a `Map(to: Row)` which indicates that we might work with a legacy sink connector.

Did you try to run the same pipeline with only an outer join (without aggregation) or only aggregation (without the outer join)? It would be interesting to find the bottleneck. It might make sense to increase Flink's managed memory which is used for sorting. Also the JVM heap size looks pretty tiny.


On 08.09.20 08:30, Violeta Milanović wrote:
Hi all, I'm currently testing PyFlink against PySpark for batch jobs. The query I'm using is:/select
avg(transaction_amount) as avg_ta,
avg(salary+bonus) as avg_income,
avg(salary+bonus) - avg(transaction_amount) as spending
from transactions t left join customers c on t.customer_id = c.customer_id
group by c.customer_id

/It's really simple one, and I have 1 million data in customers and 5 million in transactions. My output table should contains around 690k rows. But I'm facing an issue, because when I want to sink into CSV 690k rows, it's too slow comparing to PySpark. It's not problem when sink table is one row, or even 100k rows, executions are way better than PySpark. PySpark execution time is 50 seconds for this query and sink, and PyFlink is around 120 seconds. I'm running it both in docker container with default settings, and I increased parallelism for PyFlink to 6, and this is execution with 6 cpus. I'm new at Flink's memory, and as I understood to increase taskmanager.memory.process.size and jobmanager.memory.process.size, which I tried, but it didn't help.

My configuration are default, created from official docker image:

jobmanager.heap.size 1024m
taskmanager.memory.size 1568m
cpu cores:6
physical memory 11.7
jvm head size: 512m
flink managed memory: 512m

This is how my execution plan looks:



  • Re: Timo Walther
    • Re: Violeta Milanović
      • Re: Timo Walther
        • Re: Timo Walther

Reply via email to