Thanks Yingjie for pinging me. Hi vtygoss,
Leonard is right, maybe you are using the wrong statistics information. This caused the optimizer to select the **BROADCAST JOIN** incorrectly. Unfortunately, Flink needs to broadcast a huge amount of data, even gigabytes. This is really the performance killer. I think you can: - analyze your tables in Hive as Leonard said. - Or just remove "TpcdsStatsProvider.registerTpcdsStats(tEnv)" And I see your code: "table.optimizer.join.broadcast-threshold: 256 * 1024 * 1024". I think this threshold is too large. More than 10MB is not recommended. Best, Jingsong On Wed, Jun 23, 2021 at 11:08 AM Leonard Xu <xbjt...@gmail.com> wrote: > Hi, vtygoss > > Thanks for the detail report, a quick reply as I wrote the > * org.apache.flink.table.tpcds.TpcdsTestProgram* in community, I guess > you missed *table statistics information.* > > The* table statistics information* used in the TPC-DS e2e tests is > constant for 1GB verification data set, I wrote this test for checking > Flink Batch SQL works well for every PR as CI test rather than checking the > performance. Please see > *org.apache.flink.table.tpcds.stats.TpcdsStatsProvider*. > > The *table statistics information *will be used by planner(CBO > optimizer) to optimize the sql plan, the incorrect *table statistics > information *even lead to the wrong plan and sql job may run unexpectedly. > > Thus if you want to run for 3TB TPC-DS tests, you should use the > corresponding *table statistics information *for your test data set, you > can obtain the table statistics information by analyze your tables in Hive. > > Best, > Leonard > > 在 2021年6月23日,10:42,Yingjie Cao <kevin.ying...@gmail.com> 写道: > > Hi, > > I also have some experience of running TPC-DS benchmark with Flink (10T > scale). But the data shuffle amount of Q1 has a really big difference with > the numbers in the picture you shared. I am not sure what is going on, > maybe you missed something? I attached the numbers of Q1 in my test (also > with 500 max parallelism, though I used Flink version 1.13 instead of > 1.12), the running time is 20s for 10T TPC-DS. > > There are some points I known which may influence the test results, hope > these can help you: > 1. Shuffle data compression. (Disabled in Flink by default, can be enabled > by setting taskmanager.network.blocking-shuffle.compression.enabled to > true); > 2. Blocking shuffle type used. See [1] for more information. (To used > sort-shuffle, the minimum version is 1.13); > 3. Memory configuration, including network and managed memory size. > 4. Sql optimization configuration. (I am not familiar with this, > maybe @Jingsong Li has more knowledge about that). > > BTW, could you please share more information? For example, how many nodes > in your cluster? Which type of disk do you use, SSD or HDD? How many > available cores, disks and memory of each node? Could you also share the > numbers of shuffle write and shuffle read of all stages of Spark? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/ > > <image.png> > > Best, > Yingjie > > vtygoss <vtyg...@126.com> 于2021年6月22日周二 下午4:46写道: > >> Hi, >> >> I am doing performance tests on 3TB TPC-DS using >> flink/flink-end-to-end-tests/flink-tpcds-test module, but the test results >> are not good. >> >> scenario: >> tpc-ds location: hive 2.3.5 >> tpc-ds scala: 3TB, parquet + snappy >> flink version: flink-1.12-SNAPSHOT >> resource configuration: slots per task manager=5, parallesm=500, job >> manager memory=10GB, task manager memory=10GB, task manager number=100. >> >> difference of my branch and community branch: >> 1. tpc-ds stored in hive, so i change Source from CSV to Hive. >> 2. I add some optimizations explicitly about join-reorder,broadcast… >> shown as below >> 3. community tpcds test main class is org.apache.flink.table.tpcds. >> TpcdsTestProgram, my is org.apache.flink.table.tpcds.My >> <http://org.apache.flink.table.tpcds.my/>TpcdsTestProgram. Both are in >> the attachment. >> >> Do I make something wrong? please help to offer some advices. thanks very >> much! >> >> Best Regards >> >> >> ``` >> [MyTpcdsTestProgram.java] (My Branch) >> >> private static TableEnvironment prepareTableEnv(String sourceTablePath, >> Boolean useTableStats) { >> // init Table Env >> EnvironmentSettings environmentSettings = >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >> >> TableEnvironment tEnv = TableEnvironment.create(environmentSettings); >> >> tEnv.loadModule("custom-hive", new HiveModule("2.3.5")); >> String hiveSite = "/home/work/flink-1.12/conf/"; >> HiveCatalog catalog = new HiveCatalog("hive", "tpcds3t", hiveSite); >> tEnv.registerCatalog("hive", catalog); >> tEnv.useCatalog("hive"); >> tEnv.useDatabase("tpcds3t"); >> // config Optimizer parameters >> tEnv.getConfig() >> .getConfiguration() >> >> .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, >> 500); >> tEnv.getConfig() >> .getConfiguration() >> .setString( >> ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, >> >> GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString()); >> tEnv.getConfig() >> .getConfiguration() >> .setLong( >> >> OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, >> 10 * 1024 * 1024); >> tEnv.getConfig() >> .getConfiguration() >> >> .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, >> true); >> >> tEnv.getConfig() >> .getConfiguration() >> .setBoolean("table.exec.hive.infer-source-parallelism", false); >> >> tEnv.getConfig().getConfiguration().setString("table.exec.sink.not-null-enforcer", >> "drop"); >> >> tEnv.getConfig() >> .getConfiguration() >> .setBoolean("table.optimizer.join-reorder-enabled", true); >> tEnv.getConfig() >> .getConfiguration() >> .setInteger("table.optimizer.distinct-agg.split.bucket-num", >> 10240); >> tEnv.getConfig() >> .getConfiguration() >> .setBoolean("table.optimizer.distinct-agg.split.enabled", true); >> tEnv.getConfig() >> .getConfiguration() >> .setInteger("table.optimizer.join.broadcast-threshold", 256 * >> 1024 * 1024); >> tEnv.getConfig() >> .getConfiguration() >> .setBoolean("table.optimizer.reuse-sub-plan-enabled", true); >> tEnv.getConfig() >> .getConfiguration() >> .setBoolean("table.optimizer.reuse-source-enabled", true); >> tEnv.getConfig() >> .getConfiguration() >> .setBoolean("table.optimizer.source.predicate-pushdown-enabled", >> true); >> >> /* STREAMING >> tEnv.getConfig() >> .getConfiguration() >> .setInteger("table.exec.async-lookup.buffer-capacity", 10000); >> >> tEnv.getConfig().getConfiguration().setBoolean("table.exec.mini-batch.enabled", >> true); >> tEnv.getConfig() >> .getConfiguration() >> .setString("table.exec.mini-batch.allow-latency", "1 min"); >> >> tEnv.getConfig().getConfiguration().setInteger("table.exec.mini-batch.size", >> 10000);*/ >> // register statistics info >> if (useTableStats) { >> TpcdsStatsProvider.registerTpcdsStats(tEnv); >> } >> return tEnv; >> } >> >> ``` >> >> >> ``` >> TpcdsTestProgram.java(community branch) >> >> private static TableEnvironment prepareTableEnv(String sourceTablePath, >> Boolean useTableStats) { >> // init Table Env >> EnvironmentSettings environmentSettings = >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); >> TableEnvironment tEnv = TableEnvironment.create(environmentSettings); >> >> // config Optimizer parameters >> tEnv.getConfig() >> .getConfiguration() >> >> .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, >> 4); >> tEnv.getConfig() >> .getConfiguration() >> .setString( >> ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, >> >> GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString()); >> tEnv.getConfig() >> .getConfiguration() >> .setLong( >> >> OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, >> 10 * 1024 * 1024); >> tEnv.getConfig() >> .getConfiguration() >> >> .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, >> true); >> >> // register TPC-DS tables >> TPCDS_TABLES.forEach( >> table -> { >> TpcdsSchema schema = >> TpcdsSchemaProvider.getTableSchema(table); >> CsvTableSource.Builder builder = CsvTableSource.builder(); >> builder.path(sourceTablePath + FILE_SEPARATOR + table + >> DATA_SUFFIX); >> for (int i = 0; i < schema.getFieldNames().size(); i++) { >> builder.field( >> schema.getFieldNames().get(i), >> TypeConversions.fromDataTypeToLegacyInfo( >> schema.getFieldTypes().get(i))); >> } >> builder.fieldDelimiter(COL_DELIMITER); >> builder.emptyColumnAsNull(); >> builder.lineDelimiter("\n"); >> CsvTableSource tableSource = builder.build(); >> ConnectorCatalogTable catalogTable = >> ConnectorCatalogTable.source(tableSource, true); >> tEnv.getCatalog(tEnv.getCurrentCatalog()) >> .ifPresent( >> catalog -> { >> try { >> catalog.createTable( >> new ObjectPath( >> >> tEnv.getCurrentDatabase(), table), >> catalogTable, >> false); >> } catch (Exception e) { >> throw new RuntimeException(e); >> } >> }); >> }); >> // register statistics info >> if (useTableStats) { >> TpcdsStatsProvider.registerTpcdsStats(tEnv); >> } >> return tEnv; >> } >> >> ``` >> >> >> >> >> e.g. >> query 1 takes about 40min to execute(standard spark-2.3.0 token 13 >> second, spark-2.3.0 with customed Adaptive Query Execution feature token >> 5.99s), flink “Explan plan“ of query 1 is in the attachment. >> >> >> ``` >> query1.sql >> >> with customer_total_return as >> (select sr_customer_sk as ctr_customer_sk >> ,sr_store_sk as ctr_store_sk >> ,sum(sr_return_amt) as ctr_total_return >> from store_returns >> ,date_dim >> where sr_returned_date_sk = d_date_sk >> and d_year = 2000 >> group by sr_customer_sk >> ,sr_store_sk) >> select c_customer_id >> from customer_total_return ctr1 >> ,store >> ,customer >> where ctr1.ctr_total_return > (select avg(ctr_total_return)*1.2 >> from customer_total_return ctr2 >> where ctr1.ctr_store_sk = ctr2.ctr_store_sk) >> and s_store_sk = ctr1.ctr_store_sk >> and s_state = 'TN' >> and ctr1.ctr_customer_sk = c_customer_sk >> order by c_customer_id >> limit 100 >> >> ``` >> >> >> >> stages: >> <881f3241-29c4-4aa0-9c08-43560817a9eb.png> >> >> job configurations: >> <41a1da0b-8655-447f-8b2c-c2119c3ee1a3.png> >> >> JM memory: >> <1562c554-c765-47b8-b02e-bd71d1abb6d1.png> >> >> TM memory: >> <f4c3eb06-9daf-4b48-8a65-e700d2b9c5ee.png> >> >> >> >> >> >> >> >> >> >> >> >> >> > -- Best, Jingsong Lee