Dear list,

when trying to compute a simple last_value aggregate, flink fails with an 
IOException. The query is defined as follows:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)


t_env.execute_sql("""
CREATE TABLE key_change_test (
  id INT,  val1 STRING,  val2 STRING,  t AS proctime()
) WITH (
  'connector' = 'kafka',
  'format' = 'csv',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = 'localhost:9192',
  'properties.group.id' = 'foo'
)
""")

tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS 
val2 FROM key_change_test GROUP BY id")

t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH 
('connector' = 'print')")
tt.execute_insert("debug")


Looking at the logs I get the following error message:
[…]
2020-10-30 07:45:46,474 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, key_change_test]], fields=[id, val1, val2]) (21/88) 
(02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.
java.io.IOException: Insufficient number of network buffers: required 89, but 
only 67 available. The total number of network buffers is currently set to 2048 
of 32768 bytes each. You can increase this number by setting the configuration 
keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', 
and 'taskmanager.memory.network.max'.
        at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
 [flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]
[…]

What is happening there? For me it seems that flink is requesting an awful lot 
of resources for a simple query (the kafka topic has only one partition and is 
used for manual injection only, so no big traffic there).
Can you help me with any way around that problem?

Thanks in advance
Thilo

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang

Reply via email to