Hey Andrea! Sorry for the bad user experience. Regarding the network buffers: you should be able to run it after increasing the number of network buffers, just account for it when specifying the heap size etc. You currently allocate 32768 * 16384 bytes = 512 MB for them. If you have a very long pipeline and high parallelism, you should increase it accordingly. How much memory do you have on your machines?
Regarding the IllegalStateException: I suspect that this is **not** the root failure cause. The null ExecutionState can only happen, if the producer task (from which data is requested) failed during the request. The error message is confusing and I opened a JIRA to fix it: https://issues.apache.org/jira/browse/FLINK-4131. Can you please check your complete logs to see what the root cause might be, e.g. why did the producer fail? On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA <74...@studenti.unimore.it> wrote: > Hi everyone, > > I am running some Flink experiments with Peel benchmark > http://peel-framework.org/ and I am struggling with exceptions: the > environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB > and is located on Hdfs 2.7.1. Flink version is 1.0.3. > > At the beginning I tried with 400 as degree of parallelism but not enough > numberOfBuffers was raised so I changed the parallelism to 200. Flink > configuration follows: > > jobmanager.rpc.address = ${runtime.hostname} > akka.log.lifecycle.events = ON > akka.ask.timeout = 300s > jobmanager.rpc.port = 6002 > jobmanager.heap.mb = 1024 > jobmanager.web.port = 6004 > taskmanager.heap.mb = 28672 > taskmanager.memory.fraction = 0.7 > taskmanager.network.numberOfBuffers = 32768 > taskmanager.network.bufferSizeInBytes = 16384 > taskmanager.tmp.dirs = > "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" > taskmanager.debug.memory.startLogThread = true > > With a parallelism of 200 the following exception will raise from a node of > the cluster: > > 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task > - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli > nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) > -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver > .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) > switched to FAILED with exception. > java.lang.IllegalStateException: Received unexpected partition state null > for partition request. This is a bug. > at > org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) > > > The reduce code is: > > 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) > > The map code is: > > 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = { > 69 dimensionDS.map { > 70 dimension => > 71 val values = DenseVector(Array.fill(dimension)(0.0)) > 72 values > 73 } > 74 } > > I can't figure out a solution, thank you for your help. > > Andrea > > -- > Andrea Spina > N.Tessera: 74598 > MAT: 89369 > Ingegneria Informatica [LM] (D.M. 270)