Thanks for the clarification, so now I understand that only failed tasks will re-scheduled, and only the input partitions of these tasks will be re-computed.
Another confusing point from paper is: "Because of these properties, D-Streams can parallelize recovery over hundreds of cores and recover in 1-2 seconds even when checkpointing every 30s" I don't understand this sentence because there are, for example, 16 lost tasks, then only at most 16 machines are needed to participate recovery, or one machine with 16 cores. Why can spark streaming recover failure in 100 machines? For the same reason, I don't understand figure 7 and figure 12, because these two figures are using >=20 nodes to do failure recovery. Could you help me clarify these points? And please let me know if the number of recovery node could be configured, because it seems to me that these failed tasks will be treated as ordinary missing tasks, and there's no special parameter to configure recovery nodes' number. thanks, dachuan. On Tue, Mar 4, 2014 at 7:55 PM, Reynold Xin <r...@databricks.com> wrote: > BlockManager is only responsible for in-memory/on-disk storage. It has > nothing to do with re-computation. > > All the recomputation / retry code are done in the DAGScheduler. Note that > when a node crashes, due to lazy evaluation, there is no task that needs to > be re-run. Those tasks are re-run only when their outputs are needed for > another task/job. > > > On Tue, Mar 4, 2014 at 4:51 PM, dachuan <hdc1...@gmail.com> wrote: > > > Hello, developers, > > > > I am just curious about the following two things which seems to be > > contradictory to each other, please help me find out my understanding > > mistakes: > > > > 1) Excerpted from sosp 2013 paper, "Then, when a node fails, the system > > detects all missing RDD partitions and launches tasks to recompute them > > from the last checkpoint. Many tasks can be launched at the same time to > > compute different RDD partitions, allowing the whole cluster to partake > in > > recovery." > > > > 2) Excerpted from code, this function is called when there's one dead > > BlockManager, this function didn't launch tasks to recover lost > partitions, > > instead, it updated many meta-data. > > > > private def removeBlockManager(blockManagerId: BlockManagerId) { > > val info = blockManagerInfo(blockManagerId) > > > > // Remove the block manager from blockManagerIdByExecutor. > > blockManagerIdByExecutor -= blockManagerId.executorId > > > > // Remove it from blockManagerInfo and remove all the blocks. > > blockManagerInfo.remove(blockManagerId) > > val iterator = info.blocks.keySet.iterator > > while (iterator.hasNext) { > > val blockId = iterator.next > > val locations = blockLocations.get(blockId) > > locations -= blockManagerId > > if (locations.size == 0) { > > blockLocations.remove(locations) > > } > > } > > } > > > > thanks, > > dachuan. > > > > -- > > Dachuan Huang > > Cellphone: 614-390-7234 > > 2015 Neil Avenue > > Ohio State University > > Columbus, Ohio > > U.S.A. > > 43210 > > > -- Dachuan Huang Cellphone: 614-390-7234 2015 Neil Avenue Ohio State University Columbus, Ohio U.S.A. 43210