Hi Violeta,
I just noticed that the plan might be generated from Flink's old planner
instead of the new, more performant Blink planner. Which planner are you
currently using?
Regards,
Timo
On 08.09.20 17:51, Timo Walther wrote:
You are using the old connectors. The new connectors are available via
SQL DDL (and execute_sql() API) like documented here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
Maybe this will give your some performance boost, but certainly not
enough. I will loop in someone from the SQL planner team (in CC).
Regards,
Timo
On 08.09.20 17:27, Violeta Milanović wrote:
Hi Timo,
I actually tried many things, increasing jvm heap size and flink
managed memory didn't help me. Running the same query without group by
clause like this:
select
avg(transaction_amount) as avg_ta,
avg(salary+bonus) as avg_income,
avg(salary+bonus) - avg(transaction_amount) as spending
from transactions t left join customers c on t.customer_id =
c.customer_id
And execution time is 33 seconds, which is great, because is one row
sink, also I tried only to join tables, but when sink is around 5
million, execution time is 986 seconds which I find strange, because
it's only join, no aggregations.
This is my connector code which is almost the same for both
queries(output rows are different missing first and last name, because
of removing group by) with group by and without group by:
t_env.connect(FileSystem().path('transactions.csv')) \
.with_format(OldCsv().ignore_first_line().field_delimiter(",").quote_character('\"')
.field('transaction_id', DataTypes.STRING())
.field('product_id', DataTypes.STRING())
.field('transaction_amount', DataTypes.DOUBLE())
.field('transaction_date', DataTypes.STRING())
.field('customer_id', DataTypes.STRING())
) \
.with_schema(Schema()
.field('transaction_id', DataTypes.STRING())
.field('product_id', DataTypes.STRING())
.field('transaction_amount', DataTypes.DOUBLE())
.field('transaction_date', DataTypes.STRING())
.field('customer_id', DataTypes.STRING())
) \
.create_temporary_table('transactions')
t_env.connect(FileSystem().path('join_output.csv')) \
.with_format(Csv().derive_schema()
) \
.with_schema(Schema()
.field('avg_ta', DataTypes.DOUBLE())
.field('avg_income', DataTypes.INT())
.field('spending', DataTypes.DOUBLE())
) \
.create_temporary_table('mySink')
Customers table is the same, only with different fields.
Thanks again!