Dear list, when trying to compute a simple last_value aggregate, flink fails with an IOException. The query is defined as follows:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.execute_sql(""" CREATE TABLE key_change_test ( id INT, val1 STRING, val2 STRING, t AS proctime() ) WITH ( 'connector' = 'kafka', 'format' = 'csv', 'topic' = 'flink_test', 'properties.bootstrap.servers' = 'localhost:9192', 'properties.group.id' = 'foo' ) """) tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS val2 FROM key_change_test GROUP BY id") t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH ('connector' = 'print')") tt.execute_insert("debug") Looking at the logs I get the following error message: […] 2020-10-30 07:45:46,474 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, key_change_test]], fields=[id, val1, val2]) (21/88) (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED. java.io.IOException: Insufficient number of network buffers: required 89, but only 67 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141] […] What is happening there? For me it seems that flink is requesting an awful lot of resources for a simple query (the kafka topic has only one partition and is used for manual injection only, so no big traffic there). Can you help me with any way around that problem? Thanks in advance Thilo Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang