Hi Schneider, The error message suggests that your task managers are not configured with enough network memory. You would need to increase the network memory configuration. See this doc [1] for more details.
Thank you~ Xintong Song [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_trouble.html#ioexception-insufficient-number-of-network-buffers On Fri, Oct 30, 2020 at 2:53 PM Schneider, Thilo <t.schneid...@fraport.de> wrote: > Dear list, > > > > when trying to compute a simple last_value aggregate, flink fails with an > IOException. The query is defined as follows: > > > > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > env_settings = > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > t_env = StreamTableEnvironment.create(environment_settings=env_settings) > > > > t_env.execute_sql(""" > CREATE TABLE key_change_test ( > id INT, val1 STRING, val2 STRING, t AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'format' = 'csv', > 'topic' = 'flink_test', > 'properties.bootstrap.servers' = 'localhost:9192', > 'properties.group.id' = 'foo' > ) > """) > > tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) > AS val2 FROM key_change_test GROUP BY id") > > t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) > WITH ('connector' = 'print')") > tt.execute_insert("debug") > > > > > > Looking at the logs I get the following error message: > > […] > > 2020-10-30 07:45:46,474 WARN > org.apache.flink.runtime.taskmanager.Task [] - Source: > TableSourceScan(table=[[default_catalog, default_database, > key_change_test]], fields=[id, val1, val2]) (21/88) > (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED. > > java.io.IOException: Insufficient number of network buffers: required 89, > but only 67 available. The total number of network buffers is currently set > to 2048 of 32768 bytes each. You can increase this number by setting the > configuration keys 'taskmanager.memory.network.fraction', > 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'. > > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869) > [flink-dist_2.11-1.11.2.jar:1.11.2] > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) > [flink-dist_2.11-1.11.2.jar:1.11.2] > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [flink-dist_2.11-1.11.2.jar:1.11.2] > > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141] > > […] > > > > What is happening there? For me it seems that flink is requesting an awful > lot of resources for a simple query (the kafka topic has only one partition > and is used for manual injection only, so no big traffic there). > > Can you help me with any way around that problem? > > > > Thanks in advance > > Thilo > > Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, > Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: > HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender > des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; > Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, > Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang >