Hi,

Can you help to explain the q77 execution plan? And find which operator
takes a long time in flink UI?

Best
Ron

Вова Фролов <vovafrolov1...@gmail.com> 于2024年1月24日周三 09:09写道:

> Hello,
>
> I am executing a heterogeneous SQL query  (part of the data is in Hive
> and part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
> BatchMode. However, the execution time is excessively long, taking
> approximately 11 minutes to complete , although the request to Hive only
> (without Kafka) is completed in 12 seconds.
>
> How can I speed up execution heterogeneous SQL query to Kafka + Hive?
>
> *   Versions of Components in Use:*
>
> ·        Apache Flink: 1.17.1
>
> ·        Kafka: 3.2.3
>
> ·        Hive: 3.1.2.3.4.5.1-1
>
> *    Flink Job Code:*
>
> EnviromentSettings settings = 
> EnviromentSettings.newInstance().inBatchMode().build();
>
> TableEnviroment tEnv = TableEnviroment.create(settings);
>
>
>
> *Hive Catalog*
>
> HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE, PATH_TO_CONF, 
> HiveVersionInfo.getVersion());
>
> tEnv.registerCatalog(“hive”, catalog);
>
> tEnv.useCatalog(“hive”);
>
>
>
>
>
> Creating tables with Kafka connector:
>
>
>
> public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
> store_sales_kafka(\n" +
> "  ss_sold_date_sk         INTEGER,\n" +
>
> // here are 21 columns
>
> "  ss_net_profit         DOUBLE\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'key.format' = 'avro',\n" +
> "   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
> "   'properties.group.id' = 'store_sales_group',\n" +
> "   'scan.startup.mode' = 'earliest-offset',\n" +
>
> "   'scan.bounded.mode' = 'latest-offset',\n" +
> "   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, 
> xyz4:9092, xyz5:9092',\n" +
> "   'topic' = 'storeSales100',\n" +
>
> "    'value.format' = 'avro',\n" +
>
> "    'value.fields-include' = 'EXCEPT_KEY'\n" +
>
>
> "   );";
>
>
>
> Q77 with Flink
>
> tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
> Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
> List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect());
>
> for (Row row : res) {
>     System.*out*.println(row);
> }
>
>
>
> Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each
> has: 512 partitions, replication factor 3)
>
> ·        num.network.threads=12
>
> ·        num.io.threads=10
>
> ·        socket.send.buffer.bytes=2097152
>
> ·        socket.request.max.bytes=1073741824
>
>     Cluster consists of 5 machines and each has:
>
> ·        2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
> MHz max turbo frequency. 40 cores, 80 threads total on each machine.
>
> ·        RAM 768GB, up to 640GB is available for Flink.
>
> ·        2 network cards 10 Gigabit each
>
> ·        10 HDD 5.5 TB
>
> Kind regards,
>
> Vladimir
>

Reply via email to