Hi, I am trying to understand how the /spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) if (result.length != numPeers) { throw new SparkException( "Error getting peers, only got " + result.size + " instead of " + numPeers) } result } Here, getPeers calls askDriverWithReply(). private def askDriverWithReply[T](message: Any): T = { // TODO: Consider removing multiple attempts if (driverActor == null) { throw new SparkException("Error sending message to BlockManager as driverActor is null " + "[message = " + message + "]") } var attempts = 0 var lastException: Exception = null while (attempts < AKKA_RETRY_ATTEMPTS) { attempts += 1 try { val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { throw new SparkException("BlockManagerMaster returned null") } return result.asInstanceOf[T] } catch { case ie: InterruptedException => throw ie case e: Exception => lastException = e logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e) } Thread.sleep(AKKA_RETRY_INTERVAL_MS) } throw new SparkException("Error sending message to BlockManagerMaster [message = " + message + "]", lastException) } Here, getPeers method calls askDriverWithReply() with message "GetPeers()". The Driver returns the BlockManagerId's. val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) Here, we obtain "result". But, I couldn't find definition of ask() that processes message GetPeers(). Can someone please tell me how/where the 'result' is being constructed?? Thank you!! Karthik