Hello,
I am executing a heterogeneous SQL query (part of the data is in Hive and
part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
BatchMode. However, the execution time is excessively long, taking
approximately 11 minutes to complete , although the request to Hive only
(without Kafka) is completed in 12 seconds.
How can I speed up execution heterogeneous SQL query to Kafka + Hive?
* Versions of Components in Use:*
·Apache Flink: 1.17.1
·Kafka: 3.2.3
·Hive: 3.1.2.3.4.5.1-1
*Flink Job Code:*
EnviromentSettings settings =
EnviromentSettings.newInstance().inBatchMode().build();
TableEnviroment tEnv = TableEnviroment.create(settings);
*Hive Catalog*
HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE,
PATH_TO_CONF, HiveVersionInfo.getVersion());
tEnv.registerCatalog(“hive”, catalog);
tEnv.useCatalog(“hive”);
Creating tables with Kafka connector:
public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
store_sales_kafka(\n" +
" ss_sold_date_sk INTEGER,\n" +
// here are 21 columns
" ss_net_profit DOUBLE\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'key.format' = 'avro',\n" +
" 'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
" 'properties.group.id' = 'store_sales_group',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'scan.bounded.mode' = 'latest-offset',\n" +
" 'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092,
xyz4:9092, xyz5:9092',\n" +
" 'topic' = 'storeSales100',\n" +
"'value.format' = 'avro',\n" +
"'value.fields-include' = 'EXCEPT_KEY'\n" +
" );";
Q77 with Flink
tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
List res = CollectionUtil.*iteratorToList*(result.execute().collect());
for (Row row : res) {
System.*out*.println(row);
}
Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each has:
512 partitions, replication factor 3)
·num.network.threads=12
·num.io.threads=10
·socket.send.buffer.bytes=2097152
·socket.request.max.bytes=1073741824
Cluster consists of 5 machines and each has:
·2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
MHz max turbo frequency. 40 cores, 80 threads total on each machine.
·RAM 768GB, up to 640GB is available for Flink.
·2 network cards 10 Gigabit each
·10 HDD 5.5 TB
Kind regards,
Vladimir