Thanks Yingjie for pinging me.

Hi vtygoss,

Leonard is right, maybe you are using the wrong statistics information.

This caused the optimizer to select the **BROADCAST JOIN**
incorrectly. Unfortunately, Flink needs to broadcast a huge amount of data,
even gigabytes. This is really the performance killer.

I think you can:
- analyze your tables in Hive as Leonard said.
- Or just remove "TpcdsStatsProvider.registerTpcdsStats(tEnv)"

And I see your code: "table.optimizer.join.broadcast-threshold: 256 * 1024
* 1024".

I think this threshold is too large. More than 10MB is not recommended.

Best,
Jingsong

On Wed, Jun 23, 2021 at 11:08 AM Leonard Xu <xbjt...@gmail.com> wrote:

> Hi, vtygoss
>
> Thanks for the detail report,  a quick reply as I wrote the
> * org.apache.flink.table.tpcds.TpcdsTestProgram* in community, I guess
> you missed *table statistics information.*
>
> The* table statistics information* used in the TPC-DS e2e tests is
> constant for 1GB verification data set, I wrote this test for checking
> Flink Batch SQL works well for every PR as CI test rather than checking the
> performance. Please see
> *org.apache.flink.table.tpcds.stats.TpcdsStatsProvider*.
>
>  The *table statistics information *will be used by planner(CBO
> optimizer) to optimize the  sql plan, the incorrect  *table statistics
> information *even lead to the wrong plan and sql job may run unexpectedly.
>
> Thus if you want to run for 3TB TPC-DS tests, you should use the
> corresponding  *table statistics information *for your test data set, you
> can obtain the table statistics information by analyze your tables in Hive.
>
> Best,
> Leonard
>
> 在 2021年6月23日,10:42,Yingjie Cao <kevin.ying...@gmail.com> 写道:
>
> Hi,
>
> I also have some experience of running TPC-DS benchmark with Flink (10T
> scale). But the data shuffle amount of Q1 has a really big difference with
> the numbers in the picture you shared. I am not sure what is going on,
> maybe you missed something? I attached the numbers of Q1 in my test (also
> with 500 max parallelism, though I used Flink version 1.13 instead of
> 1.12), the running time is 20s for 10T TPC-DS.
>
> There are some points I known which may influence the test results, hope
> these can help you:
> 1. Shuffle data compression. (Disabled in Flink by default, can be enabled
> by setting taskmanager.network.blocking-shuffle.compression.enabled to
> true);
> 2. Blocking shuffle type used. See [1] for more information. (To used
> sort-shuffle, the minimum version is 1.13);
> 3. Memory configuration, including network and managed memory size.
> 4. Sql optimization configuration. (I am not familiar with this,
> maybe @Jingsong Li has more knowledge about that).
>
> BTW, could you please share more information? For example, how many nodes
> in your cluster? Which type of disk do you use, SSD or HDD? How many
> available cores, disks and memory of each node? Could you also share the
> numbers of shuffle write and shuffle read of all stages of Spark?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
>
> <image.png>
>
> Best,
> Yingjie
>
> vtygoss <vtyg...@126.com> 于2021年6月22日周二 下午4:46写道:
>
>> Hi,
>>
>> I am doing performance tests on 3TB TPC-DS using
>> flink/flink-end-to-end-tests/flink-tpcds-test module, but the test results
>> are not good.
>>
>> scenario:
>> tpc-ds location: hive 2.3.5
>> tpc-ds scala: 3TB, parquet + snappy
>> flink version: flink-1.12-SNAPSHOT
>> resource configuration: slots per task manager=5, parallesm=500, job
>> manager memory=10GB, task manager memory=10GB, task manager number=100.
>>
>> difference of my branch and community branch:
>> 1. tpc-ds stored in hive, so i change Source from CSV to Hive.
>> 2. I add some optimizations explicitly about  join-reorder,broadcast…
>> shown as below
>> 3. community  tpcds test main class is org.apache.flink.table.tpcds.
>> TpcdsTestProgram, my is org.apache.flink.table.tpcds.My
>> <http://org.apache.flink.table.tpcds.my/>TpcdsTestProgram. Both are in
>> the attachment.
>>
>> Do I make something wrong? please help to offer some advices. thanks very
>> much!
>>
>> Best Regards
>>
>>
>> ```
>> [MyTpcdsTestProgram.java] (My Branch)
>>
>> private static TableEnvironment prepareTableEnv(String sourceTablePath, 
>> Boolean useTableStats) {
>>     // init Table Env
>>     EnvironmentSettings environmentSettings =
>>             
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>
>>     TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
>>
>>     tEnv.loadModule("custom-hive", new HiveModule("2.3.5"));
>>     String hiveSite = "/home/work/flink-1.12/conf/";
>>     HiveCatalog catalog = new HiveCatalog("hive", "tpcds3t", hiveSite);
>>     tEnv.registerCatalog("hive", catalog);
>>     tEnv.useCatalog("hive");
>>     tEnv.useDatabase("tpcds3t");
>>     // config Optimizer parameters
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             
>> .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
>> 500);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setString(
>>                     ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
>>                     
>> GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setLong(
>>                     
>> OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
>>                     10 * 1024 * 1024);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             
>> .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, 
>> true);
>>
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setBoolean("table.exec.hive.infer-source-parallelism", false);
>>     
>> tEnv.getConfig().getConfiguration().setString("table.exec.sink.not-null-enforcer",
>>  "drop");
>>
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setBoolean("table.optimizer.join-reorder-enabled", true);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setInteger("table.optimizer.distinct-agg.split.bucket-num", 
>> 10240);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setBoolean("table.optimizer.distinct-agg.split.enabled", true);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setInteger("table.optimizer.join.broadcast-threshold", 256 * 
>> 1024 * 1024);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setBoolean("table.optimizer.reuse-sub-plan-enabled", true);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setBoolean("table.optimizer.reuse-source-enabled", true);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setBoolean("table.optimizer.source.predicate-pushdown-enabled", 
>> true);
>>
>>     /* STREAMING
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setInteger("table.exec.async-lookup.buffer-capacity", 10000);
>>     
>> tEnv.getConfig().getConfiguration().setBoolean("table.exec.mini-batch.enabled",
>>  true);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setString("table.exec.mini-batch.allow-latency", "1 min");
>>     
>> tEnv.getConfig().getConfiguration().setInteger("table.exec.mini-batch.size", 
>> 10000);*/
>>     // register statistics info
>>     if (useTableStats) {
>>         TpcdsStatsProvider.registerTpcdsStats(tEnv);
>>     }
>>     return tEnv;
>> }
>>
>> ```
>>
>>
>> ```
>> TpcdsTestProgram.java(community branch)
>>
>> private static TableEnvironment prepareTableEnv(String sourceTablePath, 
>> Boolean useTableStats) {
>>     // init Table Env
>>     EnvironmentSettings environmentSettings =
>>             
>> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>>     TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
>>
>>     // config Optimizer parameters
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             
>> .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
>> 4);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setString(
>>                     ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
>>                     
>> GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             .setLong(
>>                     
>> OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
>>                     10 * 1024 * 1024);
>>     tEnv.getConfig()
>>             .getConfiguration()
>>             
>> .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, 
>> true);
>>
>>     // register TPC-DS tables
>>     TPCDS_TABLES.forEach(
>>             table -> {
>>                 TpcdsSchema schema = 
>> TpcdsSchemaProvider.getTableSchema(table);
>>                 CsvTableSource.Builder builder = CsvTableSource.builder();
>>                 builder.path(sourceTablePath + FILE_SEPARATOR + table + 
>> DATA_SUFFIX);
>>                 for (int i = 0; i < schema.getFieldNames().size(); i++) {
>>                     builder.field(
>>                             schema.getFieldNames().get(i),
>>                             TypeConversions.fromDataTypeToLegacyInfo(
>>                                     schema.getFieldTypes().get(i)));
>>                 }
>>                 builder.fieldDelimiter(COL_DELIMITER);
>>                 builder.emptyColumnAsNull();
>>                 builder.lineDelimiter("\n");
>>                 CsvTableSource tableSource = builder.build();
>>                 ConnectorCatalogTable catalogTable =
>>                         ConnectorCatalogTable.source(tableSource, true);
>>                 tEnv.getCatalog(tEnv.getCurrentCatalog())
>>                         .ifPresent(
>>                                 catalog -> {
>>                                     try {
>>                                         catalog.createTable(
>>                                                 new ObjectPath(
>>                                                         
>> tEnv.getCurrentDatabase(), table),
>>                                                 catalogTable,
>>                                                 false);
>>                                     } catch (Exception e) {
>>                                         throw new RuntimeException(e);
>>                                     }
>>                                 });
>>             });
>>     // register statistics info
>>     if (useTableStats) {
>>         TpcdsStatsProvider.registerTpcdsStats(tEnv);
>>     }
>>     return tEnv;
>> }
>>
>> ```
>>
>>
>>
>>
>> e.g.
>> query 1 takes about 40min to execute(standard spark-2.3.0 token 13
>> second, spark-2.3.0 with customed Adaptive Query Execution feature token
>> 5.99s), flink “Explan plan“ of query 1 is in the attachment.
>>
>>
>> ```
>> query1.sql
>>
>> with customer_total_return as
>> (select sr_customer_sk as ctr_customer_sk
>> ,sr_store_sk as ctr_store_sk
>> ,sum(sr_return_amt) as ctr_total_return
>> from store_returns
>> ,date_dim
>> where sr_returned_date_sk = d_date_sk
>> and d_year = 2000
>> group by sr_customer_sk
>> ,sr_store_sk)
>>  select  c_customer_id
>> from customer_total_return ctr1
>> ,store
>> ,customer
>> where ctr1.ctr_total_return > (select avg(ctr_total_return)*1.2
>> from customer_total_return ctr2
>> where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
>> and s_store_sk = ctr1.ctr_store_sk
>> and s_state = 'TN'
>> and ctr1.ctr_customer_sk = c_customer_sk
>> order by c_customer_id
>> limit 100
>>
>> ```
>>
>>
>>
>> stages:
>> <881f3241-29c4-4aa0-9c08-43560817a9eb.png>
>>
>> job configurations:
>> <41a1da0b-8655-447f-8b2c-c2119c3ee1a3.png>
>>
>> JM memory:
>> <1562c554-c765-47b8-b02e-bd71d1abb6d1.png>
>>
>> TM memory:
>> <f4c3eb06-9daf-4b48-8a65-e700d2b9c5ee.png>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

-- 
Best, Jingsong Lee

Reply via email to