You are using the old connectors. The new connectors are available via SQL DDL (and execute_sql() API) like documented here:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Maybe this will give your some performance boost, but certainly not enough. I will loop in someone from the SQL planner team (in CC).

Regards,
Timo


On 08.09.20 17:27, Violeta Milanović wrote:
Hi Timo,

I actually tried many things, increasing jvm heap size and flink managed memory didn't help me. Running the same query without group by clause like this:

select
avg(transaction_amount) as avg_ta,
avg(salary+bonus) as avg_income,
avg(salary+bonus) - avg(transaction_amount) as spending
from transactions t left join customers c on t.customer_id = c.customer_id

And execution time is 33 seconds, which is great, because is one row sink, also I tried only to join tables, but when sink is around 5 million,  execution time is 986 seconds which I find strange, because it's only join, no aggregations.

This is my connector code which is almost the same for both queries(output rows are different missing first and last name, because of removing group by) with group by and without group by:

t_env.connect(FileSystem().path('transactions.csv')) \
     
.with_format(OldCsv().ignore_first_line().field_delimiter(",").quote_character('\"')
                  .field('transaction_id', DataTypes.STRING())
                  .field('product_id', DataTypes.STRING())
                  .field('transaction_amount', DataTypes.DOUBLE())
                  .field('transaction_date', DataTypes.STRING())
                  .field('customer_id', DataTypes.STRING())
                  ) \
     .with_schema(Schema()
                  .field('transaction_id', DataTypes.STRING())
                  .field('product_id', DataTypes.STRING())
                  .field('transaction_amount', DataTypes.DOUBLE())
                  .field('transaction_date', DataTypes.STRING())
                  .field('customer_id', DataTypes.STRING())
                  ) \
     .create_temporary_table('transactions')

t_env.connect(FileSystem().path('join_output.csv')) \
     .with_format(Csv().derive_schema()
                  ) \
     .with_schema(Schema()
                  .field('avg_ta', DataTypes.DOUBLE())
                  .field('avg_income', DataTypes.INT())
                  .field('spending', DataTypes.DOUBLE())
                  ) \
     .create_temporary_table('mySink')

Customers table is the same, only with different fields.

Thanks again!



  • Re: Timo Walther
    • Re: Violeta Milanović
      • Re: Timo Walther
        • Re: Timo Walther

Reply via email to