I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver is the final output aggregated on the reducers 
end, and merged back to the driver.
Yong

Date: Thu, 1 Oct 2015 13:33:59 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: nicolae.maras...@adswizz.com
CC: user@spark.apache.org

Hi Nicolae,Thanks for the reply. To further clarify things -
sc.textFile is reading from HDFS, now shouldn't the file be read in a way such 
that EACH executer works on only the local copy of file part available , in 
this case its a ~ 4.64 GB file and block size is 256MB, so approx 19 partitions 
will be created and each task will run on  1 partition (which is what I am 
seeing in the stages logs) , also i assume it will read the file in a way that 
each executer will have exactly same amount of data. so there shouldn't be any 
shuffling in reading atleast.
During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is the 
output I am seeing
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC 
TimeInput Size / RecordsWrite TimeShuffle Write Size / 
RecordsErrors0440SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 
s256.0 MB (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 
10.35.244.112015/09/29 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 
273I have following questions -
1) What exactly is 2.7KB of shuffle write  ?2) is this 2.7 KB of shuffle write 
is local to that executer ?3) In the executers log I am seeing 2000 bytes 
results sent to the driver , if instead this number is much much greater than 
2000 byes such that it does not fit in executer's memory , will shuffle write 
increase ?4)For word count 256 MB data is substantial amount text , how come 
the result for this stage is only 2000 bytes !! it should send everyword with 
respective count , for a 256 MB input this result should be much bigger ? 
I hope I am clear this time.
Hope to get a reply,
ThanksKartik


On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu 
<nicolae.maras...@adswizz.com> wrote:







Hi,




So you say " sc.textFile
-> flatMap -> Map".



My understanding is like this:
First step is a number of partitions are determined, p of them. You can give 
hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).



Relatively at the same time or not, the n nodes start opening different 
sections of the file - the physical equivalent of the
partitions: for instance in HDFS they would do an open and a seek I guess and 
just read from the stream there, convert to whatever the InputFormat dictates.


The shuffle can only be the part when a node opens an HDFS file for instance 
but the node does not have a local replica of the blocks which it needs to read 
(those pertaining to his assigned partitions). So he needs to pick them up from 
remote
nodes which do have replicas of that data.



After blocks are read into memory, flatMap and Map are local computations 
generating new RDDs and in the end the result is sent to the driver (whatever 
termination computation does on the RDD like the result of reduce, or side 
effects of rdd.foreach, etc).



Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.



Nicu



From: Kartik Mathur <kar...@bluedata.com>

Sent: Thursday, October 1, 2015 10:25 PM

To: Nicolae Marasoiu

Cc: user

Subject: Re: Problem understanding spark word count execution
 


Thanks Nicolae , 
So In my case all executers are sending results back to the driver and and 
"shuffle
is just sending out the textFile to distribute the partitions", could you 
please elaborate on this  ? what exactly is in this file ?



On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu 
<nicolae.maras...@adswizz.com> wrote:








Hi,



2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 
transformations.



More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.







From: Kartik Mathur <kar...@bluedata.com>

Sent: Thursday, October 1, 2015 12:42 AM

To: user

Subject: Problem understanding spark word count execution
 




Hi All,



I tried running spark word count and I have couple of questions - 



I am analyzing stage 0 , i.e 
 sc.textFile -> flatMap -> Map (Word count example)



1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB,
question - how can I know where all did this task write ? like how many bytes 
to which executer ?



2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is ,
if the results were directly sent to driver what is this shuffle write ? 



Thanks,
Kartik




















                                          

Reply via email to