Hi Schneider,

The error message suggests that your task managers are not configured with
enough network memory. You would need to increase the network memory
configuration. See this doc [1] for more details.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_trouble.html#ioexception-insufficient-number-of-network-buffers

On Fri, Oct 30, 2020 at 2:53 PM Schneider, Thilo <t.schneid...@fraport.de>
wrote:

> Dear list,
>
>
>
> when trying to compute a simple last_value aggregate, flink fails with an
> IOException. The query is defined as follows:
>
>
>
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> t_env = StreamTableEnvironment.create(environment_settings=env_settings)
>
>
>
> t_env.execute_sql("""
> CREATE TABLE key_change_test (
>   id INT,  val1 STRING,  val2 STRING,  t AS proctime()
> ) WITH (
>   'connector' = 'kafka',
>   'format' = 'csv',
>   'topic' = 'flink_test',
>   'properties.bootstrap.servers' = 'localhost:9192',
>   'properties.group.id' = 'foo'
> )
> """)
>
> tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) 
> AS val2 FROM key_change_test GROUP BY id")
>
> t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) 
> WITH ('connector' = 'print')")
> tt.execute_insert("debug")
>
>
>
>
>
> Looking at the logs I get the following error message:
>
> […]
>
> 2020-10-30 07:45:46,474 WARN
> org.apache.flink.runtime.taskmanager.Task                    [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> key_change_test]], fields=[id, val1, val2]) (21/88)
> (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.
>
> java.io.IOException: Insufficient number of network buffers: required 89,
> but only 67 available. The total number of network buffers is currently set
> to 2048 of 32768 bytes each. You can increase this number by setting the
> configuration keys 'taskmanager.memory.network.fraction',
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
>
>         at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]
>
> […]
>
>
>
> What is happening there? For me it seems that flink is requesting an awful
> lot of resources for a simple query (the kafka topic has only one partition
> and is used for manual injection only, so no big traffic there).
>
> Can you help me with any way around that problem?
>
>
>
> Thanks in advance
>
> Thilo
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>

Reply via email to