Hi, Can you help to explain the q77 execution plan? And find which operator takes a long time in flink UI?
Best Ron Вова Фролов <vovafrolov1...@gmail.com> 于2024年1月24日周三 09:09写道: > Hello, > > I am executing a heterogeneous SQL query (part of the data is in Hive > and part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in > BatchMode. However, the execution time is excessively long, taking > approximately 11 minutes to complete , although the request to Hive only > (without Kafka) is completed in 12 seconds. > > How can I speed up execution heterogeneous SQL query to Kafka + Hive? > > * Versions of Components in Use:* > > · Apache Flink: 1.17.1 > > · Kafka: 3.2.3 > > · Hive: 3.1.2.3.4.5.1-1 > > * Flink Job Code:* > > EnviromentSettings settings = > EnviromentSettings.newInstance().inBatchMode().build(); > > TableEnviroment tEnv = TableEnviroment.create(settings); > > > > *Hive Catalog* > > HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE, PATH_TO_CONF, > HiveVersionInfo.getVersion()); > > tEnv.registerCatalog(“hive”, catalog); > > tEnv.useCatalog(“hive”); > > > > > > Creating tables with Kafka connector: > > > > public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE > store_sales_kafka(\n" + > " ss_sold_date_sk INTEGER,\n" + > > // here are 21 columns > > " ss_net_profit DOUBLE\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'key.format' = 'avro',\n" + > " 'key.fields' = 'ss_item_sk;ss_ticket_number',\n" + > " 'properties.group.id' = 'store_sales_group',\n" + > " 'scan.startup.mode' = 'earliest-offset',\n" + > > " 'scan.bounded.mode' = 'latest-offset',\n" + > " 'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, > xyz4:9092, xyz5:9092',\n" + > " 'topic' = 'storeSales100',\n" + > > " 'value.format' = 'avro',\n" + > > " 'value.fields-include' = 'EXCEPT_KEY'\n" + > > > " );"; > > > > Q77 with Flink > > tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*); > Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*); > List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect()); > > for (Row row : res) { > System.*out*.println(row); > } > > > > Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each > has: 512 partitions, replication factor 3) > > · num.network.threads=12 > > · num.io.threads=10 > > · socket.send.buffer.bytes=2097152 > > · socket.request.max.bytes=1073741824 > > Cluster consists of 5 machines and each has: > > · 2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600 > MHz max turbo frequency. 40 cores, 80 threads total on each machine. > > · RAM 768GB, up to 640GB is available for Flink. > > · 2 network cards 10 Gigabit each > > · 10 HDD 5.5 TB > > Kind regards, > > Vladimir >