Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if check, perhaps obscuring its existence.
On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek <kartheek.m...@gmail.com> wrote: > Hi, > > var cachedPeers: Seq[BlockManagerId] = null > private def replicate(blockId: String, data: ByteBuffer, level: > StorageLevel) { > val tLevel = StorageLevel(level.useDisk, level.useMemory, > level.deserialized, 1) > if (cachedPeers == null) { > cachedPeers = master.getPeers(blockManagerId, level.replication - 1) > } > for (peer: BlockManagerId <- cachedPeers) { > val start = System.nanoTime > data.rewind() > logDebug("Try to replicate BlockId " + blockId + " once; The size of > the data is " > + data.limit() + " Bytes. To node: " + peer) > if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, > tLevel), > new ConnectionManagerId(peer.host, peer.port))) { > logError("Failed to call syncPutBlock to " + peer) > } > logDebug("Replicated BlockId " + blockId + " once used " + > (System.nanoTime - start) / 1e6 + " s; The size of the data is " + > data.limit() + " bytes.") > } > > > I get the flow of this code. But, I dont find any method being called for > actually writing the data into the set of peers chosen for replication. > > Where exaclty is the replication happening? > > Thank you!! > -Karthik >