Hi Don I went to a presentation given by Professor Ion Stoica. He mentioned that Python was a little slower in general because of the type system. I do not remember all of his comments. I think the context had to do with spark SQL and data frames.
I wonder if the python issue is similar to the boxing/unboxing issue in Java? Andy From: Don Drake <dondr...@gmail.com> Date: Monday, November 23, 2015 at 7:10 PM To: Andrew Davidson <a...@santacruzintegration.com> Cc: Xiao Li <gatorsm...@gmail.com>, Sabarish Sasidharan <sabarish.sasidha...@manthan.com>, "user @spark" <user@spark.apache.org> Subject: Re: newbie : why are thousands of empty files being created on HDFS? > I'm seeing similar slowness in saveAsTextFile(), but only in Python. > > I'm sorting data in a dataframe, then transform it and get a RDD, and then > coalesce(1).saveAsTextFile(). > > I converted the Python to Scala and the run-times were similar, except for the > saveAsTextFile() stage. The scala version was much faster. > > When looking at the executor logs during that stage, I see the following when > running the Scala code: > 15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Getting 600 > non-empty blocks out of 600 blocks > > 15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Started 184 remote > fetches in 64 ms > > 15/11/23 20:51:30 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (0 time so far) > > 15/11/23 20:51:35 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (1 time so far) > > 15/11/23 20:51:40 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (2 times so far) > > 15/11/23 20:51:45 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (3 times so far) > > 15/11/23 20:51:50 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (4 times so far) > > 15/11/23 20:51:54 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (5 times so far) > > 15/11/23 20:51:59 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (6 times so far) > > 15/11/23 20:52:04 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (7 times so far) > > 15/11/23 20:52:09 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data > of 146.0 MB to disk (8 times so far) > > > > When running the Python version during the saveAsTextFile() stage, I see: > > 15/11/23 21:04:03 INFO python.PythonRunner: Times: total = 16190, boot = 5, > init = 144, finish = 16041 > > 15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Getting 300 > non-empty blocks out of 300 blocks > > 15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote > fetches in 82 ms > > 15/11/23 21:04:15 INFO python.PythonRunner: Times: total = 12180, boot = -415, > init = 447, finish = 12148 > > 15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Getting 300 > non-empty blocks out of 300 blocks > > 15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote > fetches in 129 ms > > 15/11/23 21:04:27 INFO python.PythonRunner: Times: total = 11450, boot = -372, > init = 398, finish = 11424 > > 15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Getting 300 > non-empty blocks out of 300 blocks > > 15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote > fetches in 70 ms > > 15/11/23 21:04:42 INFO python.PythonRunner: Times: total = 14480, boot = -378, > init = 403, finish = 14455 > > 15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Getting 300 > non-empty blocks out of 300 blocks > > 15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote > fetches in 62 ms > > 15/11/23 21:04:54 INFO python.PythonRunner: Times: total = 11868, boot = -366, > init = 381, finish = 11853 > > 15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Getting 300 > non-empty blocks out of 300 blocks > > 15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote > fetches in 59 ms > > 15/11/23 21:05:10 INFO python.PythonRunner: Times: total = 15375, boot = -392, > init = 403, finish = 15364 > > 15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Getting 300 > non-empty blocks out of 300 blocks > > 15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote > fetches in 48 ms > > > > The python version is approximately 10 times slower than the Scala version. > Any ideas why? > > > > -Don > > > On Mon, Nov 23, 2015 at 4:31 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> Hi Xiao and Sabarish >> >> Using the Stage tab on the UI. It turns out you can see how many >> partitions there are. If I did nothing I would have 228155 partition. >> (This confirms what Sabarish said). I tried coalesce(3). RDD.count() >> fails. I though given I have 3 workers and 1/3 of the data would easily >> fit into memory this would be a good choice. >> >> If I use coalesce(30) count works. How ever it still seems slow. It took >> 2.42 min to read 4720 records. My total data set size is 34M. >> >> Any suggestions how to choose the number of partitions.? >> >> ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G') >> >> >> The data was originally collected using spark stream. I noticed that the >> number of default partitions == the number of files create on hdfs. I bet >> each file is one spark streaming mini-batchI suspect if I concatenate >> these into a small number of files things will run much faster. I suspect >> I would not need to call coalesce() and that coalesce() is taking a lot of >> time. Any suggestions how to choose the file number of files. >> >> Kind regards >> >> Andy >> >> >> From: Xiao Li <gatorsm...@gmail.com> >> Date: Monday, November 23, 2015 at 12:21 PM >> To: Andrew Davidson <a...@santacruzintegration.com> >> Cc: Sabarish Sasidharan <sabarish.sasidha...@manthan.com>, "user @spark" >> <user@spark.apache.org> >> Subject: Re: newbie : why are thousands of empty files being created on >> HDFS? >> >> >>> >In your case, maybe you can try to call the function coalesce? >>> >Good luck, >>> > >>> >Xiao Li >>> > >>> >2015-11-23 12:15 GMT-08:00 Andy Davidson <a...@santacruzintegration.com>: >>> > >>> >Hi Sabarish >>> > >>> >I am but a simple padawan :-) I do not understand your answer. Why would >>> >Spark be creating so many empty partitions? My real problem is my >>> >application is very slow. I happened to notice thousands of empty files >>> >being created. I thought this is a hint to why my app is slow. >>> > >>> >My program calls sample( 0.01).filter(not null).saveAsTextFile(). This >>> >takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk. >>> >The total data writing in 38M. >>> > >>> >The data is read from HDFS. My understanding is Spark can not know in >>> >advance how HDFS partitioned the data. Spark knows I have a master and 3 >>> >slaves machines. It knows how many works/executors are assigned to my >>> >Job. I would expect spark would be smart enough not create more >>> >partitions than I have worker machines? >>> > >>> >Also given I am not using any key/value operations like Join() or doing >>> >multiple scans I would assume my app would not benefit from partitioning. >>> > >>> > >>> >Kind regards >>> > >>> >Andy >>> > >>> > >>> >From: Sabarish Sasidharan <sabarish.sasidha...@manthan.com> >>> >Date: Saturday, November 21, 2015 at 7:20 PM >>> >To: Andrew Davidson <a...@santacruzintegration.com> >>> >Cc: "user @spark" <user@spark.apache.org> >>> >Subject: Re: newbie : why are thousands of empty files being created on >>> >HDFS? >>> > >>> > >>> > >>> >Those are empty partitions. I don't see the number of partitions >>> >specified in code. That then implies the default parallelism config is >>> >being used and is set to a very high number, the sum of empty + non empty >>> >files. >>> >Regards >>> >Sab >>> >On 21-Nov-2015 11:59 pm, "Andy Davidson" <a...@santacruzintegration.com> >>> >wrote: >>> > >>> >I start working on a very simple ETL pipeline for a POC. It reads a in a >>> >data set of tweets stored as JSON strings on in HDFS and randomly selects >>> >1% of the observations and writes them to HDFS. It seems to run very >>> >slowly. E.G. To write 4720 observations takes 1:06:46.577795. I >>> >Also noticed that RDD saveAsTextFile is creating thousands of empty >>> >files. >>> > >>> >I assume creating all these empty files must be slowing down the system. >>> >Any idea why this is happening? Do I have write a script to periodical >>> >remove empty files? >>> > >>> > >>> >Kind regards >>> > >>> >Andy >>> > >>> >tweetStrings = sc.textFile(inputDataURL) >>> > >>> > >>> >def removeEmptyLines(line) : >>> > if line: >>> > return True >>> > else : >>> > emptyLineCount.add(1); >>> > return False >>> > >>> >emptyLineCount = sc.accumulator(0) >>> >sample = (tweetStrings.filter(removeEmptyLines) >>> > .sample(withReplacement=False, fraction=0.01, seed=345678)) >>> > >>> > >>> >startTime = datetime.datetime.now() >>> >sample.saveAsTextFile(saveDataURL) >>> > >>> >endTime = datetime.datetime.now() >>> >print("elapsed time:%s" % (datetime.datetime.now() - startTime)) >>> > >>> > >>> >elapsed time:1:06:46.577795 >>> > >>> >Total number of empty files$ hadoop fs -du {saveDataURL} | grep '^0' | wc >>> >l223515 >>> >Total number of files with data$ hadoop fs -du {saveDataURL} | grep v >>> >'^0' | wc l4642 >>> > >>> >I randomly pick a part file. It¹s size is 9251 >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > > > -- > Donald Drake > Drake Consulting > http://www.drakeconsulting.com/ > https://twitter.com/dondrake <http://www.MailLaunder.com/> > 800-733-2143