Hello Arvid Heise,

Thanks for replying! Based on your suggestion, I put together the following
snippet for the config:

  val config = new Configuration()

  private val newMemorySize =
config.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE)
    .multiply(4)
    .multiply(config.get(TaskManagerOptions.NUM_TASK_SLOTS).toDouble)
    .multiply(parallelism.toDouble)
  config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, newMemorySize)
  config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, newMemorySize)

It seems to work as intended even with high parallelism, as long as process
functions are not involved. As soon as that is no longer the case, the value
resulting from the formula is too low, regardless of parallelism. How would
one go about accounting for that?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to