Hi Timo,
I actually tried many things, increasing jvm heap size and flink managed
memory didn't help me. Running the same query without group by clause like
this:
select
avg(transaction_amount) as avg_ta,
avg(salary+bonus) as avg_income,
avg(salary+bonus) - avg(transaction_amount) as spending
from transactions t left join customers c on t.customer_id = c.customer_id
And execution time is 33 seconds, which is great, because is one row sink,
also I tried only to join tables, but when sink is around 5 million,
execution time is 986 seconds which I find strange, because it's only join,
no aggregations.
This is my connector code which is almost the same for both queries(output
rows are different missing first and last name, because of removing group
by) with group by and without group by:
t_env.connect(FileSystem().path('transactions.csv')) \
.with_format(OldCsv().ignore_first_line().field_delimiter(",").quote_character('\"')
.field('transaction_id', DataTypes.STRING())
.field('product_id', DataTypes.STRING())
.field('transaction_amount', DataTypes.DOUBLE())
.field('transaction_date', DataTypes.STRING())
.field('customer_id', DataTypes.STRING())
) \
.with_schema(Schema()
.field('transaction_id', DataTypes.STRING())
.field('product_id', DataTypes.STRING())
.field('transaction_amount', DataTypes.DOUBLE())
.field('transaction_date', DataTypes.STRING())
.field('customer_id', DataTypes.STRING())
) \
.create_temporary_table('transactions')
t_env.connect(FileSystem().path('join_output.csv')) \
.with_format(Csv().derive_schema()
) \
.with_schema(Schema()
.field('avg_ta', DataTypes.DOUBLE())
.field('avg_income', DataTypes.INT())
.field('spending', DataTypes.DOUBLE())
) \
.create_temporary_table('mySink')
Customers table is the same, only with different fields.
Thanks again!