Hello, I am executing a heterogeneous SQL query (part of the data is in Hive and part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in BatchMode. However, the execution time is excessively long, taking approximately 11 minutes to complete , although the request to Hive only (without Kafka) is completed in 12 seconds.
How can I speed up execution heterogeneous SQL query to Kafka + Hive? * Versions of Components in Use:* · Apache Flink: 1.17.1 · Kafka: 3.2.3 · Hive: 3.1.2.3.4.5.1-1 * Flink Job Code:* EnviromentSettings settings = EnviromentSettings.newInstance().inBatchMode().build(); TableEnviroment tEnv = TableEnviroment.create(settings); *Hive Catalog* HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE, PATH_TO_CONF, HiveVersionInfo.getVersion()); tEnv.registerCatalog(“hive”, catalog); tEnv.useCatalog(“hive”); Creating tables with Kafka connector: public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE store_sales_kafka(\n" + " ss_sold_date_sk INTEGER,\n" + // here are 21 columns " ss_net_profit DOUBLE\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'key.format' = 'avro',\n" + " 'key.fields' = 'ss_item_sk;ss_ticket_number',\n" + " 'properties.group.id' = 'store_sales_group',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'scan.bounded.mode' = 'latest-offset',\n" + " 'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, xyz4:9092, xyz5:9092',\n" + " 'topic' = 'storeSales100',\n" + " 'value.format' = 'avro',\n" + " 'value.fields-include' = 'EXCEPT_KEY'\n" + " );"; Q77 with Flink tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*); Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*); List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect()); for (Row row : res) { System.*out*.println(row); } Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each has: 512 partitions, replication factor 3) · num.network.threads=12 · num.io.threads=10 · socket.send.buffer.bytes=2097152 · socket.request.max.bytes=1073741824 Cluster consists of 5 machines and each has: · 2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600 MHz max turbo frequency. 40 cores, 80 threads total on each machine. · RAM 768GB, up to 640GB is available for Flink. · 2 network cards 10 Gigabit each · 10 HDD 5.5 TB Kind regards, Vladimir