Hi,

I am trying to understand how the
/spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.

def getPeers(blockManagerId: BlockManagerId, numPeers: Int):
Seq[BlockManagerId] = {

val result =
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))

if (result.length != numPeers) {

throw new SparkException(

"Error getting peers, only got " + result.size + " instead of " + numPeers)

}

result
 }

Here, getPeers calls askDriverWithReply().

 private def askDriverWithReply[T](message: Any): T = {

// TODO: Consider removing multiple attempts

if (driverActor == null) {

throw new SparkException("Error sending message to BlockManager as
driverActor is null " +

"[message = " + message + "]")

}

var attempts = 0

var lastException: Exception = null

while (attempts < AKKA_RETRY_ATTEMPTS) {

attempts += 1

try {

val future = driverActor.ask(message)(timeout)

val result = Await.result(future, timeout)

if (result == null) {

throw new SparkException("BlockManagerMaster returned null")

}

 return result.asInstanceOf[T]

} catch {

case ie: InterruptedException => throw ie

case e: Exception =>

lastException = e

logWarning("Error sending message to BlockManagerMaster in " + attempts + "
attempts", e)

}

Thread.sleep(AKKA_RETRY_INTERVAL_MS)

}

throw new SparkException("Error sending message to BlockManagerMaster
[message = " + message + "]", lastException)
 }

Here, getPeers method calls askDriverWithReply() with message "GetPeers()".
The Driver returns the BlockManagerId's.

val future = driverActor.ask(message)(timeout)

val result = Await.result(future, timeout)
Here, we obtain "result". But, I couldn't find definition of ask() that
processes message GetPeers(). Can someone please tell me how/where the
'result' is being constructed??

Thank you!!
Karthik

Reply via email to