Hi, I have one question on removeRdd method in BlockManagerMasterActor.scala about asking slave actor to remove RDD.
in this piece of code, Future.sequence(blockManagerInfo.values.map { bm => bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int] }.toSeq) it asks all blockManagerInfo to remove rdd. Shouldn't we filter blockManagerInfo so as to only pick up the BlockManagerInfo which did contains that RDD? I did my changes to see if making sense, E:\projects\amplab\spark>git diff diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scold mode 10064 4 new mode 100755 index a999d76..fccc5a9 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -128,9 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act // Find all blocks for the given RDD, remove the block from both blockLocations and // the blockManagerInfo that is tracking the blocks. val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) + val bmInfos = new mutable.HashSet[BlockManagerMasterActor.BlockManagerInfo] blocks.foreach { blockId => val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId) - bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId))) + bms.foreach{ bm => + blockManagerInfo.get(bm).foreach{ bmInfo => + bmInfos += bmInfo + bmInfo.removeBlock(blockId) + } + } blockLocations.remove(blockId) } @@ -138,7 +144,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act // The dispatcher is used as an implicit argument into the Future sequence construction. import context.dispatcher val removeMsg = RemoveRdd(rddId) - Future.sequence(blockManagerInfo.values.map { bm => + Future.sequence(bmInfos.map { bm => bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int] }.toSeq) } Thanks, Qiuzhuang