Thanks for the clarification, so now I understand that only failed tasks
will re-scheduled, and only the input partitions of these tasks will be
re-computed.

Another confusing point from paper is:

"Because of these properties, D-Streams can parallelize recovery over
hundreds of cores and recover in 1-2 seconds even when checkpointing every
30s"

I don't understand this sentence because there are, for example, 16 lost
tasks, then only at most 16 machines are needed to participate recovery, or
one machine with 16 cores. Why can spark streaming recover failure in 100
machines?

For the same reason, I don't understand figure 7 and figure 12, because
these two figures are using >=20 nodes to do failure recovery.

Could you help me clarify these points?

And please let me know if the number of recovery node could be configured,
because it seems to me that these failed tasks will be treated as ordinary
missing tasks, and there's no special parameter to configure recovery
nodes' number.

thanks,
dachuan.


On Tue, Mar 4, 2014 at 7:55 PM, Reynold Xin <r...@databricks.com> wrote:

> BlockManager is only responsible for in-memory/on-disk storage. It has
> nothing to do with re-computation.
>
> All the recomputation / retry code are done in the DAGScheduler. Note that
> when a node crashes, due to lazy evaluation, there is no task that needs to
> be re-run. Those tasks are re-run only when their outputs are needed for
> another task/job.
>
>
> On Tue, Mar 4, 2014 at 4:51 PM, dachuan <hdc1...@gmail.com> wrote:
>
> > Hello, developers,
> >
> > I am just curious about the following two things which seems to be
> > contradictory to each other, please help me find out my understanding
> > mistakes:
> >
> > 1) Excerpted from sosp 2013 paper, "Then, when a node fails, the system
> > detects all missing RDD partitions and launches tasks to recompute them
> > from the last checkpoint. Many tasks can be launched at the same time to
> > compute different RDD partitions, allowing the whole cluster to partake
> in
> > recovery."
> >
> > 2) Excerpted from code, this function is called when there's one dead
> > BlockManager, this function didn't launch tasks to recover lost
> partitions,
> > instead, it updated many meta-data.
> >
> >   private def removeBlockManager(blockManagerId: BlockManagerId) {
> >     val info = blockManagerInfo(blockManagerId)
> >
> >     // Remove the block manager from blockManagerIdByExecutor.
> >     blockManagerIdByExecutor -= blockManagerId.executorId
> >
> >     // Remove it from blockManagerInfo and remove all the blocks.
> >     blockManagerInfo.remove(blockManagerId)
> >     val iterator = info.blocks.keySet.iterator
> >     while (iterator.hasNext) {
> >       val blockId = iterator.next
> >       val locations = blockLocations.get(blockId)
> >       locations -= blockManagerId
> >       if (locations.size == 0) {
> >         blockLocations.remove(locations)
> >       }
> >     }
> >   }
> >
> > thanks,
> > dachuan.
> >
> > --
> > Dachuan Huang
> > Cellphone: 614-390-7234
> > 2015 Neil Avenue
> > Ohio State University
> > Columbus, Ohio
> > U.S.A.
> > 43210
> >
>



-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210

Reply via email to