Hi, vtygoss

Thanks for the detail report,  a quick reply as I wrote the 
org.apache.flink.table.tpcds.TpcdsTestProgram in community, I guess you missed 
table statistics information.

The table statistics information used in the TPC-DS e2e tests is constant for 
1GB verification data set, I wrote this test for checking Flink Batch SQL works 
well for every PR as CI test rather than checking the performance. Please see 
org.apache.flink.table.tpcds.stats.TpcdsStatsProvider.

 The table statistics information will be used by planner(CBO optimizer) to 
optimize the  sql plan, the incorrect  table statistics information even lead 
to the wrong plan and sql job may run unexpectedly.

Thus if you want to run for 3TB TPC-DS tests, you should use the corresponding  
table statistics information for your test data set, you can obtain the table 
statistics information by analyze your tables in Hive.

Best,
Leonard

> 在 2021年6月23日,10:42,Yingjie Cao <kevin.ying...@gmail.com> 写道:
> 
> Hi,
> 
> I also have some experience of running TPC-DS benchmark with Flink (10T 
> scale). But the data shuffle amount of Q1 has a really big difference with 
> the numbers in the picture you shared. I am not sure what is going on, maybe 
> you missed something? I attached the numbers of Q1 in my test (also with 500 
> max parallelism, though I used Flink version 1.13 instead of 1.12), the 
> running time is 20s for 10T TPC-DS.
> 
> There are some points I known which may influence the test results, hope 
> these can help you:
> 1. Shuffle data compression. (Disabled in Flink by default, can be enabled by 
> setting taskmanager.network.blocking-shuffle.compression.enabled to true);
> 2. Blocking shuffle type used. See [1] for more information. (To used 
> sort-shuffle, the minimum version is 1.13);
> 3. Memory configuration, including network and managed memory size.
> 4. Sql optimization configuration. (I am not familiar with this, maybe 
> @Jingsong Li has more knowledge about that).
> 
> BTW, could you please share more information? For example, how many nodes in 
> your cluster? Which type of disk do you use, SSD or HDD? How many available 
> cores, disks and memory of each node? Could you also share the numbers of 
> shuffle write and shuffle read of all stages of Spark?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/>
> 
> <image.png>
> 
> Best,
> Yingjie
> 
> vtygoss <vtyg...@126.com <mailto:vtyg...@126.com>> 于2021年6月22日周二 下午4:46写道:
> Hi, 
> 
> I am doing performance tests on 3TB TPC-DS using 
> flink/flink-end-to-end-tests/flink-tpcds-test module, but the test results 
> are not good.
> 
> scenario:
> tpc-ds location: hive 2.3.5 
> tpc-ds scala: 3TB, parquet + snappy  
> flink version: flink-1.12-SNAPSHOT
> resource configuration: slots per task manager=5, parallesm=500, job manager 
> memory=10GB, task manager memory=10GB, task manager number=100. 
> 
> difference of my branch and community branch: 
> 1. tpc-ds stored in hive, so i change Source from CSV to Hive.
> 2. I add some optimizations explicitly about  join-reorder,broadcast… shown 
> as below
> 3. community  tpcds test main class is 
> org.apache.flink.table.tpcds.TpcdsTestProgram, my is 
> org.apache.flink.table.tpcds.My 
> <http://org.apache.flink.table.tpcds.my/>TpcdsTestProgram. Both are in the 
> attachment.   
> 
> Do I make something wrong? please help to offer some advices. thanks very 
> much!
> 
> Best Regards
> 
> 
> ```
> [MyTpcdsTestProgram.java] (My Branch)
> private static TableEnvironment prepareTableEnv(String sourceTablePath, 
> Boolean useTableStats) {
>     // init Table Env
>     EnvironmentSettings environmentSettings =
>             
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> 
>     TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> 
>     tEnv.loadModule("custom-hive", new HiveModule("2.3.5"));
>     String hiveSite = "/home/work/flink-1.12/conf/";
>     HiveCatalog catalog = new HiveCatalog("hive", "tpcds3t", hiveSite);
>     tEnv.registerCatalog("hive", catalog);
>     tEnv.useCatalog("hive");
>     tEnv.useDatabase("tpcds3t");
>     // config Optimizer parameters
>     tEnv.getConfig()
>             .getConfiguration()
>             
> .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
> 500);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setString(
>                     ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
>                     
> GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
>     tEnv.getConfig()
>             .getConfiguration()
>             .setLong(
>                     
> OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
>                     10 * 1024 * 1024);
>     tEnv.getConfig()
>             .getConfiguration()
>             
> .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, 
> true);
> 
>     tEnv.getConfig()
>             .getConfiguration()
>             .setBoolean("table.exec.hive.infer-source-parallelism", false);
>     
> tEnv.getConfig().getConfiguration().setString("table.exec.sink.not-null-enforcer",
>  "drop");
> 
>     tEnv.getConfig()
>             .getConfiguration()
>             .setBoolean("table.optimizer.join-reorder-enabled", true);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setInteger("table.optimizer.distinct-agg.split.bucket-num", 
> 10240);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setBoolean("table.optimizer.distinct-agg.split.enabled", true);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setInteger("table.optimizer.join.broadcast-threshold", 256 * 
> 1024 * 1024);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setBoolean("table.optimizer.reuse-sub-plan-enabled", true);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setBoolean("table.optimizer.reuse-source-enabled", true);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setBoolean("table.optimizer.source.predicate-pushdown-enabled", 
> true);
> 
>     /* STREAMING
>     tEnv.getConfig()
>             .getConfiguration()
>             .setInteger("table.exec.async-lookup.buffer-capacity", 10000);
>     
> tEnv.getConfig().getConfiguration().setBoolean("table.exec.mini-batch.enabled",
>  true);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setString("table.exec.mini-batch.allow-latency", "1 min");
>     
> tEnv.getConfig().getConfiguration().setInteger("table.exec.mini-batch.size", 
> 10000);*/
>     // register statistics info
>     if (useTableStats) {
>         TpcdsStatsProvider.registerTpcdsStats(tEnv);
>     }
>     return tEnv;
> }
> ```
> 
> 
> ```
> TpcdsTestProgram.java(community branch)
> private static TableEnvironment prepareTableEnv(String sourceTablePath, 
> Boolean useTableStats) {
>     // init Table Env
>     EnvironmentSettings environmentSettings =
>             
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>     TableEnvironment tEnv = TableEnvironment.create(environmentSettings);
> 
>     // config Optimizer parameters
>     tEnv.getConfig()
>             .getConfiguration()
>             
> .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
> 4);
>     tEnv.getConfig()
>             .getConfiguration()
>             .setString(
>                     ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
>                     
> GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
>     tEnv.getConfig()
>             .getConfiguration()
>             .setLong(
>                     
> OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD,
>                     10 * 1024 * 1024);
>     tEnv.getConfig()
>             .getConfiguration()
>             
> .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, 
> true);
> 
>     // register TPC-DS tables
>     TPCDS_TABLES.forEach(
>             table -> {
>                 TpcdsSchema schema = 
> TpcdsSchemaProvider.getTableSchema(table);
>                 CsvTableSource.Builder builder = CsvTableSource.builder();
>                 builder.path(sourceTablePath + FILE_SEPARATOR + table + 
> DATA_SUFFIX);
>                 for (int i = 0; i < schema.getFieldNames().size(); i++) {
>                     builder.field(
>                             schema.getFieldNames().get(i),
>                             TypeConversions.fromDataTypeToLegacyInfo(
>                                     schema.getFieldTypes().get(i)));
>                 }
>                 builder.fieldDelimiter(COL_DELIMITER);
>                 builder.emptyColumnAsNull();
>                 builder.lineDelimiter("\n");
>                 CsvTableSource tableSource = builder.build();
>                 ConnectorCatalogTable catalogTable =
>                         ConnectorCatalogTable.source(tableSource, true);
>                 tEnv.getCatalog(tEnv.getCurrentCatalog())
>                         .ifPresent(
>                                 catalog -> {
>                                     try {
>                                         catalog.createTable(
>                                                 new ObjectPath(
>                                                         
> tEnv.getCurrentDatabase(), table),
>                                                 catalogTable,
>                                                 false);
>                                     } catch (Exception e) {
>                                         throw new RuntimeException(e);
>                                     }
>                                 });
>             });
>     // register statistics info
>     if (useTableStats) {
>         TpcdsStatsProvider.registerTpcdsStats(tEnv);
>     }
>     return tEnv;
> }
> ```
> 
> 
> 
> 
> e.g. 
> query 1 takes about 40min to execute(standard spark-2.3.0 token 13 second, 
> spark-2.3.0 with customed Adaptive Query Execution feature token 5.99s), 
> flink “Explan plan“ of query 1 is in the attachment.
> 
> 
> ```
> query1.sql
> with customer_total_return as
> (select sr_customer_sk as ctr_customer_sk
> ,sr_store_sk as ctr_store_sk
> ,sum(sr_return_amt) as ctr_total_return
> from store_returns
> ,date_dim
> where sr_returned_date_sk = d_date_sk
> and d_year = 2000
> group by sr_customer_sk
> ,sr_store_sk)
>  select  c_customer_id
> from customer_total_return ctr1
> ,store
> ,customer
> where ctr1.ctr_total_return > (select avg(ctr_total_return)*1.2
> from customer_total_return ctr2
> where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
> and s_store_sk = ctr1.ctr_store_sk
> and s_state = 'TN'
> and ctr1.ctr_customer_sk = c_customer_sk
> order by c_customer_id
> limit 100
> ```
> 
> 
> 
> stages:
> <881f3241-29c4-4aa0-9c08-43560817a9eb.png>
> 
> job configurations:
> <41a1da0b-8655-447f-8b2c-c2119c3ee1a3.png>
> 
> JM memory:
> <1562c554-c765-47b8-b02e-bd71d1abb6d1.png>
> 
> TM memory:
> <f4c3eb06-9daf-4b48-8a65-e700d2b9c5ee.png>
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 

Reply via email to