Hello Flink Community, I am texting to you with an issue I have encountered while using Apache Flink version 1.17.1. In my Flink Job, I am using Kafka version 3.6.0 to ingest data from TPC-DS(current tpcds100 target size tpcds10000), and then I am executing SQL queries, specifically, the q77 query, on the data in Kafka.
* Versions of Components in Use:* · Apache Flink: 1.17.1 · Kafka: 3.6.0 Kafka Settings: (kafka cluster consists of 9 topics and each has: 512 partitions, replication factor 3) · num.network.threads=12 · num.io.threads=10 · socket.send.buffer.bytes=2097152 · socket.request.max.bytes=1073741824 * Flink Job Code:* Creating tables with Kafka connector: public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE store_sales_kafka(\n" + " ss_sold_date_sk INTEGER,\n" + // here are 21 columns " ss_net_profit DECIMAL(7, 2)\n" + ") WITH (\n" +" 'connector' = 'kafka',\n" +" 'key.format' = 'avro',\n" +" 'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +" 'properties.group.id' = 'store_sales_group',\n" +" 'scan.startup.mode' = 'earliest-offset',\n" +" 'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, xyz4:9092, xyz5:9092',\n" +" 'topic' = 'storeSales100',\n" + " 'value.format' = 'avro',\n" + " 'value.fields-include' = 'EXCEPT_KEY'\n" + " );"; Q77 with Flink StreamExecutionEnvironment env = StreamExecutionEnvironment.*getExecutionEnvironment*();StreamTableEnvironment tEnv = StreamTableEnvironment.*create*(env); tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);List<Row> res = CollectionUtil.*iteratorToList*(result.execute().collect()); for (Row row : res) { System.*out*.println(row);} * Flink Job Configuration:* I tried several configurations, but here are the main ones: 1. slots per TaskManager 10, parallelism 100; 2. slots per TaskManager 5, parallelism 50; 3. slots per TaskManager 15, parallelism 375; The result is always about the same * Logs and Errors:* The logs from my Flink Job do not contain any errors. *Description of the Issue:* The Flink Job runs smoothly for approximately 5 minutes, during which data processing and transformations occur as expected. However, after this initial period, the Flink Job seems to enter a state where no further changes or updates are observed in the processed data. In the logs I see a message: “ 2023-12-18 INFO org.apache.kafka.clients.NetworkClient [] - [AdminClient clientId=store_group-enumerator-admin-client] Node -1 disconnected “ , that is written every 5 minutes It's worth noting that, despite the lack of errors in the logs, the Flink Job essentially becomes unresponsive or ceases to make progress, resulting in a stagnation of data processing. This behavior is consistent across different configurations tested, including variations in the number of slots per TaskManager and parallelism. While the logs do not indicate any errors, they do not provide insights into the reason behind the observed data processing stagnation. Cluster consists of 5 machines and each has: · 2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600 MHz max turbo frequency. 40 cores, 80 threads total on each machine. · RAM 768GB, up to 640GB is available for Flink. · 2 network cards 10 Gigabit each · 10 HDD 5.5 TB This issue significantly hinders the overall effectiveness of utilizing Apache Flink for my data processing needs. I am seeking guidance to understand and resolve the underlying cause of this behavior. I am looking forward to receiving yours advises. Please let me know if you need additional details. Kind regards, Vladimir