Hi everyone, I am running some Flink experiments with Peel benchmark http://peel-framework.org/ and I am struggling with exceptions: the environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB and is located on Hdfs 2.7.1. Flink version is 1.0.3.
At the beginning I tried with 400 as degree of parallelism but not enough numberOfBuffers was raised so I changed the parallelism to 200. Flink configuration follows: jobmanager.rpc.address = ${runtime.hostname} akka.log.lifecycle.events = ON akka.ask.timeout = 300s jobmanager.rpc.port = 6002 jobmanager.heap.mb = 1024 jobmanager.web.port = 6004 taskmanager.heap.mb = 28672 taskmanager.memory.fraction = 0.7 taskmanager.network.numberOfBuffers = 32768 taskmanager.network.bufferSizeInBytes = 16384 taskmanager.tmp.dirs = "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp" taskmanager.debug.memory.startLogThread = true With a parallelism of 200 the following exception will raise from a node of the cluster: 2016-06-29 11:31:55,673 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43)) -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1) switched to FAILED with exception. java.lang.IllegalStateException: Received unexpected partition state null for partition request. This is a bug. at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994) The reduce code is: 43 val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) The map code is: 68 def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = { 69 dimensionDS.map { 70 dimension => 71 val values = DenseVector(Array.fill(dimension)(0.0)) 72 values 73 } 74 } I can't figure out a solution, thank you for your help. Andrea -- *Andrea Spina* N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)