One of the MultiInput operators works for 12 minutes. The screenshot shows all stages of Flink job.
[image: 3.png] Q77 Query of TPC-DS Benchmark. All *_sales and *_returns tables(6 tables) are read from Kafka, and the remaining 3 tables ( date_dim, web_page, store) from Hive. with ss as (select s_store_sk, sum(ss_ext_sales_price) as sales, sum(ss_net_profit) as profit from store_sales, date_dim, store where ss_sold_date_sk = d_date_sk and d_date between cast('1998-08-04' as date) and (cast('1998-08-04' as date) + 30 days) and ss_store_sk = s_store_sk group by s_store_sk) , sr as (select s_store_sk, sum(sr_return_amt) as returns, sum(sr_net_loss) as profit_loss from store_returns, date_dim, store where sr_returned_date_sk = d_date_sk and d_date between cast('1998-08-04' as date) and (cast('1998-08-04' as date) + 30 days) and sr_store_sk = s_store_sk group by s_store_sk), cs as (select cs_call_center_sk, sum(cs_ext_sales_price) as sales, sum(cs_net_profit) as profit from catalog_sales, date_dim where cs_sold_date_sk = d_date_sk and d_date between cast('1998-08-04' as date) and (cast('1998-08-04' as date) + 30 days) group by cs_call_center_sk ), cr as (select cr_call_center_sk, sum(cr_return_amount) as returns, sum(cr_net_loss) as profit_loss from catalog_returns, date_dim where cr_returned_date_sk = d_date_sk and d_date between cast('1998-08-04' as date) and (cast('1998-08-04' as date) + 30 days) group by cr_call_center_sk ), ws as ( select wp_web_page_sk, sum(ws_ext_sales_price) as sales, sum(ws_net_profit) as profit from web_sales, date_dim, web_page where ws_sold_date_sk = d_date_sk and d_date between cast('1998-08-04' as date) and (cast('1998-08-04' as date) + 30 days) and ws_web_page_sk = wp_web_page_sk group by wp_web_page_sk), wr as (select wp_web_page_sk, sum(wr_return_amt) as returns, sum(wr_net_loss) as profit_loss from web_returns, date_dim, web_page where wr_returned_date_sk = d_date_sk and d_date between cast('1998-08-04' as date) and (cast('1998-08-04' as date) + 30 days) and wr_web_page_sk = wp_web_page_sk group by wp_web_page_sk) select channel , id , sum(sales) as sales , sum(returns) as returns , sum(profit) as profit from (select 'store channel' as channel , ss.s_store_sk as id , sales , coalesce(returns, 0) as returns , (profit - coalesce(profit_loss,0)) as profit from ss left join sr on ss.s_store_sk = sr.s_store_sk union all select 'catalog channel' as channel , cs_call_center_sk as id , sales , returns , (profit - profit_loss) as profit from cs , cr union all select 'web channel' as channel , ws.wp_web_page_sk as id , sales , coalesce(returns, 0) returns , (profit - coalesce(profit_loss,0)) as profit from ws left join wr on ws.wp_web_page_sk = wr.wp_web_page_sk ) x group by rollup (channel, id) order by channel ,id LIMIT 100; Kind regards, Vladimir. чт, 25 янв. 2024 г. в 14:43, Ron liu <ron9....@gmail.com>: > Hi, > > Can you help to explain the q77 execution plan? And find which operator > takes a long time in flink UI? > > Best > Ron > > Вова Фролов <vovafrolov1...@gmail.com> 于2024年1月24日周三 09:09写道: > >> Hello, >> >> I am executing a heterogeneous SQL query (part of the data is in Hive >> and part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in >> BatchMode. However, the execution time is excessively long, taking >> approximately 11 minutes to complete , although the request to Hive only >> (without Kafka) is completed in 12 seconds. >> >> How can I speed up execution heterogeneous SQL query to Kafka + Hive? >> >> * Versions of Components in Use:* >> >> · Apache Flink: 1.17.1 >> >> · Kafka: 3.2.3 >> >> · Hive: 3.1.2.3.4.5.1-1 >> >> * Flink Job Code:* >> >> EnviromentSettings settings = >> EnviromentSettings.newInstance().inBatchMode().build(); >> >> TableEnviroment tEnv = TableEnviroment.create(settings); >> >> >> >> *Hive Catalog* >> >> HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE, >> PATH_TO_CONF, HiveVersionInfo.getVersion()); >> >> tEnv.registerCatalog(“hive”, catalog); >> >> tEnv.useCatalog(“hive”); >> >> >> >> >> >> Creating tables with Kafka connector: >> >> >> >> public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY >> TABLE store_sales_kafka(\n" + >> " ss_sold_date_sk INTEGER,\n" + >> >> // here are 21 columns >> >> " ss_net_profit DOUBLE\n" + >> ") WITH (\n" + >> " 'connector' = 'kafka',\n" + >> " 'key.format' = 'avro',\n" + >> " 'key.fields' = 'ss_item_sk;ss_ticket_number',\n" + >> " 'properties.group.id' = 'store_sales_group',\n" + >> " 'scan.startup.mode' = 'earliest-offset',\n" + >> >> " 'scan.bounded.mode' = 'latest-offset',\n" + >> " 'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, >> xyz4:9092, xyz5:9092',\n" + >> " 'topic' = 'storeSales100',\n" + >> >> " 'value.format' = 'avro',\n" + >> >> " 'value.fields-include' = 'EXCEPT_KEY'\n" + >> >> >> " );"; >> >> >> >> Q77 with Flink >> >> tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*); >> Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*); >> List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect()); >> >> for (Row row : res) { >> System.*out*.println(row); >> } >> >> >> >> Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each >> has: 512 partitions, replication factor 3) >> >> · num.network.threads=12 >> >> · num.io.threads=10 >> >> · socket.send.buffer.bytes=2097152 >> >> · socket.request.max.bytes=1073741824 >> >> Cluster consists of 5 machines and each has: >> >> · 2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, >> 3600 MHz max turbo frequency. 40 cores, 80 threads total on each machine. >> >> · RAM 768GB, up to 640GB is available for Flink. >> >> · 2 network cards 10 Gigabit each >> >> · 10 HDD 5.5 TB >> >> Kind regards, >> >> Vladimir >> >