Hello Arvid Heise, Thanks for replying! Based on your suggestion, I put together the following snippet for the config:
val config = new Configuration() private val newMemorySize = config.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE) .multiply(4) .multiply(config.get(TaskManagerOptions.NUM_TASK_SLOTS).toDouble) .multiply(parallelism.toDouble) config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, newMemorySize) config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, newMemorySize) It seems to work as intended even with high parallelism, as long as process functions are not involved. As soon as that is no longer the case, the value resulting from the formula is too low, regardless of parallelism. How would one go about accounting for that? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/