Hi,
I have the classic word count example:
> file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ +
> _).collect()
From the Job UI, I can only see 2 stages: 0-collect and 1-map.
What happened to ShuffledRDD in reduceByKey? And both flatMap and map
operations is collapsed into a single stage?
14/11/25 16:02:35 INFO SparkContext: Starting job: collect at <console>:15
14/11/25 16:02:35 INFO DAGScheduler: Registering RDD 6 (map at <console>:15)
14/11/25 16:02:35 INFO DAGScheduler: Got job 0 (collect at <console>:15) with 2
output partitions (allowLocal=false)
14/11/25 16:02:35 INFO DAGScheduler: Final stage: Stage 0(collect at
<console>:15)
14/11/25 16:02:35 INFO DAGScheduler: Parents of final stage: List(Stage 1)
14/11/25 16:02:35 INFO DAGScheduler: Missing parents: List(Stage 1)
14/11/25 16:02:35 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[6] at map at
<console>:15), which has no missing parents
14/11/25 16:02:35 INFO MemoryStore: ensureFreeSpace(3464) called with
curMem=163705, maxMem=278302556
14/11/25 16:02:35 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 3.4 KB, free 265.3 MB)
14/11/25 16:02:35 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1
(MappedRDD[6] at map at <console>:15)
14/11/25 16:02:35 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
14/11/25 16:02:35 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0,
localhost, PROCESS_LOCAL, 1208 bytes)
14/11/25 16:02:35 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1,
localhost, PROCESS_LOCAL, 1208 bytes)
14/11/25 16:02:35 INFO Executor: Running task 0.0 in stage 1.0 (TID 0)
14/11/25 16:02:35 INFO Executor: Running task 1.0 in stage 1.0 (TID 1)
14/11/25 16:02:35 INFO HadoopRDD: Input split:
file:/Users/ltsai/Downloads/spark-1.1.0-bin-hadoop2.4/README.md:0+2405
14/11/25 16:02:35 INFO HadoopRDD: Input split:
file:/Users/ltsai/Downloads/spark-1.1.0-bin-hadoop2.4/README.md:2405+2406
14/11/25 16:02:35 INFO deprecation: mapred.tip.id is deprecated. Instead, use
mapreduce.task.id
14/11/25 16:02:35 INFO deprecation: mapred.task.id is deprecated. Instead, use
mapreduce.task.attempt.id
14/11/25 16:02:35 INFO deprecation: mapred.task.is.map is deprecated. Instead,
use mapreduce.task.ismap
14/11/25 16:02:35 INFO deprecation: mapred.job.id is deprecated. Instead, use
mapreduce.job.id
14/11/25 16:02:35 INFO deprecation: mapred.task.partition is deprecated.
Instead, use mapreduce.task.partition
14/11/25 16:02:36 INFO Executor: Finished task 0.0 in stage 1.0 (TID 0). 1869
bytes result sent to driver
14/11/25 16:02:36 INFO Executor: Finished task 1.0 in stage 1.0 (TID 1). 1869
bytes result sent to driver
14/11/25 16:02:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0)
in 536 ms on localhost (1/2)
14/11/25 16:02:36 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1)
in 529 ms on localhost (2/2)
14/11/25 16:02:36 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have
all completed, from pool
14/11/25 16:02:36 INFO DAGScheduler: Stage 1 (map at <console>:15) finished in
0.562 s
14/11/25 16:02:36 INFO DAGScheduler: looking for newly runnable stages
14/11/25 16:02:36 INFO DAGScheduler: running: Set()
14/11/25 16:02:36 INFO DAGScheduler: waiting: Set(Stage 0)
14/11/25 16:02:36 INFO DAGScheduler: failed: Set()
14/11/25 16:02:36 INFO DAGScheduler: Missing parents for Stage 0: List()
14/11/25 16:02:36 INFO DAGScheduler: Submitting Stage 0 (ShuffledRDD[7] at
reduceByKey at <console>:15), which is now runnable
14/11/25 16:02:36 INFO MemoryStore: ensureFreeSpace(2112) called with
curMem=167169, maxMem=278302556
14/11/25 16:02:36 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 2.1 KB, free 265.2 MB)
14/11/25 16:02:36 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(ShuffledRDD[7] at reduceByKey at <console>:15)
14/11/25 16:02:36 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/11/25 16:02:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 2,
localhost, PROCESS_LOCAL, 948 bytes)
14/11/25 16:02:36 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 3,
localhost, PROCESS_LOCAL, 948 bytes)
14/11/25 16:02:36 INFO Executor: Running task 0.0 in stage 0.0 (TID 2)
14/11/25 16:02:36 INFO Executor: Running task 1.0 in stage 0.0 (TID 3)
14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting
2 non-empty blocks out of 2 blocks
14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting
2 non-empty blocks out of 2 blocks
14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started
0 remote fetches in 5 ms
14/11/25 16:02:36 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started
0 remote fetches in 5 ms
14/11/25 16:02:36 INFO Executor: Finished task 0.0 in stage 0.0 (TID 2). 4602
bytes result sent to driver
14/11/25 16:02:36 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 2)
in 135 ms on localhost (1/2)
14/11/25 16:02:36 INFO Executor: Finished task 1.0 in stage 0.0 (TID 3). 5051
bytes result sent to driver
14/11/25 16:02:36 INFO DAGScheduler: Stage 0 (collect at <console>:15) finished
in 0.168 s
14/11/25 16:02:36 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 3)
in 160 ms on localhost (2/2)
14/11/25 16:02:36 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
14/11/25 16:02:36 INFO SparkContext: Job finished: collect at <console>:15,
took 1.046888 s
Thanks!
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]