ask() is a method on every Actor. It comes from the akka library, which spark uses for a lot of the communication between various components.
There is some documentation on ask() here (go to the section on "Send messages"): http://doc.akka.io/docs/akka/2.2.3/scala/actors.html though if you are totally new to it, you might want to work through a simple akka tutorial first, before diving into the docs. On Fri, Nov 7, 2014 at 4:11 AM, rapelly kartheek <kartheek.m...@gmail.com> wrote: > Hi, > > I am trying to understand how the > /spark/*/Storage/BlockManagerMaster.askDriverWithReply() works. > > def getPeers(blockManagerId: BlockManagerId, numPeers: Int): > Seq[BlockManagerId] = { > > val result = > askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) > > if (result.length != numPeers) { > > throw new SparkException( > > "Error getting peers, only got " + result.size + " instead of " + numPeers) > > } > > result > } > > Here, getPeers calls askDriverWithReply(). > > private def askDriverWithReply[T](message: Any): T = { > > // TODO: Consider removing multiple attempts > > if (driverActor == null) { > > throw new SparkException("Error sending message to BlockManager as > driverActor is null " + > > "[message = " + message + "]") > > } > > var attempts = 0 > > var lastException: Exception = null > > while (attempts < AKKA_RETRY_ATTEMPTS) { > > attempts += 1 > > try { > > val future = driverActor.ask(message)(timeout) > > val result = Await.result(future, timeout) > > if (result == null) { > > throw new SparkException("BlockManagerMaster returned null") > > } > > return result.asInstanceOf[T] > > } catch { > > case ie: InterruptedException => throw ie > > case e: Exception => > > lastException = e > > logWarning("Error sending message to BlockManagerMaster in " + attempts + " > attempts", e) > > } > > Thread.sleep(AKKA_RETRY_INTERVAL_MS) > > } > > throw new SparkException("Error sending message to BlockManagerMaster > [message = " + message + "]", lastException) > } > > Here, getPeers method calls askDriverWithReply() with message "GetPeers()". > The Driver returns the BlockManagerId's. > > val future = driverActor.ask(message)(timeout) > > val result = Await.result(future, timeout) > Here, we obtain "result". But, I couldn't find definition of ask() that > processes message GetPeers(). Can someone please tell me how/where the > 'result' is being constructed?? > > Thank you!! > Karthik >