Hello,

I am executing a heterogeneous SQL query  (part of the data is in Hive and
part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
BatchMode. However, the execution time is excessively long, taking
approximately 11 minutes to complete , although the request to Hive only
(without Kafka) is completed in 12 seconds.

How can I speed up execution heterogeneous SQL query to Kafka + Hive?

*   Versions of Components in Use:*

·        Apache Flink: 1.17.1

·        Kafka: 3.2.3

·        Hive: 3.1.2.3.4.5.1-1

*    Flink Job Code:*

EnviromentSettings settings =
EnviromentSettings.newInstance().inBatchMode().build();

TableEnviroment tEnv = TableEnviroment.create(settings);



*Hive Catalog*

HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE,
PATH_TO_CONF, HiveVersionInfo.getVersion());

tEnv.registerCatalog(“hive”, catalog);

tEnv.useCatalog(“hive”);





Creating tables with Kafka connector:



public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
store_sales_kafka(\n" +
"  ss_sold_date_sk         INTEGER,\n" +

// here are 21 columns

"  ss_net_profit         DOUBLE\n" +
") WITH (\n" +
"   'connector' = 'kafka',\n" +
"   'key.format' = 'avro',\n" +
"   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
"   'properties.group.id' = 'store_sales_group',\n" +
"   'scan.startup.mode' = 'earliest-offset',\n" +

"   'scan.bounded.mode' = 'latest-offset',\n" +
"   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092,
xyz4:9092, xyz5:9092',\n" +
"   'topic' = 'storeSales100',\n" +

"    'value.format' = 'avro',\n" +

"    'value.fields-include' = 'EXCEPT_KEY'\n" +


"   );";



Q77 with Flink

tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect());

for (Row row : res) {
    System.*out*.println(row);
}



Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each has:
512 partitions, replication factor 3)

·        num.network.threads=12

·        num.io.threads=10

·        socket.send.buffer.bytes=2097152

·        socket.request.max.bytes=1073741824

    Cluster consists of 5 machines and each has:

·        2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
MHz max turbo frequency. 40 cores, 80 threads total on each machine.

·        RAM 768GB, up to 640GB is available for Flink.

·        2 network cards 10 Gigabit each

·        10 HDD 5.5 TB

Kind regards,

Vladimir

Reply via email to