Hey Rick , Not sure on this but similar situation happened with me, when starting spark-shell it was starting a new cluster instead of using the existing cluster and this new cluster was a single node cluster , that's why jobs were taking forever to complete from spark-shell and were running much faster using submit (which reads conf correctly) or zeppelin for that matter.
Thanks, Kartik On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz <rah...@gmail.com> wrote: > I've finally been able to pick this up again, after upgrading to Spark > 1.4.1, because my code used the HiveContext, which runs fine in the REPL > (be it via Zeppelin or the shell) but won't work with spark-submit. > With 1.4.1, I hav actually managed to get a result with the Spark shell, > but after > 3847,802237 seconds and in particular the last stage took 1320,672 seconds. > This was after I used coalesce to balance the workload initiall, since a > Hive filter I applied normally would make for a skewed distribution of the > data onto the nodes. > Nonetheless, the same code (even withouth the coalesce) would work much > faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit > job, the run time was just a tenth at > 446,657534 seconds for the entire job and notably 38,961 seconds for the > final stage. > > Again, there is a huge difference in the amount of data that gets > shuffled/spilled (which leads to much earlier OOM-conditions), when using > spark-shell. > What could be the reason for this different behaviour using very similar > configurations and identical data, machines and code (identical DAGs and > sources) and identical spark binaries? Why would code launched from > spark-shell generate more shuffled data for the same number of shuffled > tuples? > > An analysis would be much appreciated. > > Best, > > Rick > > On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote: > >> oops, forgot to reply-all on this thread. >> >> ---------- Forwarded message ---------- >> From: Rick Moritz <rah...@gmail.com> >> Date: Wed, Aug 19, 2015 at 2:46 PM >> Subject: Re: Strange shuffle behaviour difference between Zeppelin and >> Spark-shell >> To: Igor Berman <igor.ber...@gmail.com> >> >> >> Those values are not explicitely set, and attempting to read their values >> results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'. >> What I mean by the volume per element being larger is illustrated in my >> original post: for each case the number of elements is identical, but the >> volume of data required to obtain/manage these elements is many times >> greater. >> >> The only difference used to be that Zeppelin had FAIR scheduling over >> FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR >> scheduling makes no difference. The only other difference in the >> environment lies in some class-path variables which should only affect >> method availability, not actual usage. >> >> Another fact to note: Spark assembly (1.4.0-rc4) was built with provided >> hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 >> -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean >> package) for 2.6.0 from Hortonworks, while Zeppelin was built with >> dependencies against 2.6.0 from Maven central. >> >> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com> >> wrote: >> >>> so what your case for version differences? >>> what do u mean by "in spark-shell the volume per element is much >>> larger" >>> can you verify that configuration in spark ui (under Environment tab is >>> same). >>> if you suspect compression than check following properties: >>> spark.shuffle.compress >>> spark.shuffle.spill.compress >>> spark.io.compression.codec >>> spark.rdd.compress >>> >>> >>> >>> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote: >>> >>>> Number of partitions and even size look relatively similar - except in >>>> spark-shell the volume per element is much larger, especially in later >>>> stages. That's when shuffles start to spill. Zeppelin creates almost no >>>> spills at all. The number of elements per partition are the same for both >>>> setups, but with very different data volume in/out. Almost as though >>>> compression was used in one case, and not in another, or as though >>>> shuffling is somehow less specific, and more nodes get data that they >>>> ultimately don't process at all. The same shuffling algorithm appears to be >>>> at work in each case, if the partitioning of the number of elements is >>>> anything to go by. >>>> >>>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com> >>>> wrote: >>>> >>>>> i would compare spark ui metrics for both cases and see any >>>>> differences(number of partitions, number of spills etc) >>>>> why can't you make repl to be consistent with zepellin spark version? >>>>> might be rc has issues... >>>>> >>>>> >>>>> >>>>> >>>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote: >>>>> >>>>>> No, the setup is one driver with 32g of memory, and three executors >>>>>> each with 8g of memory in both cases. No core-number has been specified, >>>>>> thus it should default to single-core (though I've seen the yarn-owned >>>>>> jvms >>>>>> wrapping the executors take up to 3 cores in top). That is, unless, as I >>>>>> suggested, there are different defaults for the two means of job >>>>>> submission >>>>>> that come into play in a non-transparent fashion (i.e. not visible in >>>>>> SparkConf). >>>>>> >>>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <igor.ber...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> any differences in number of cores, memory settings for executors? >>>>>>> >>>>>>> >>>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote: >>>>>>> >>>>>>>> Dear list, >>>>>>>> >>>>>>>> I am observing a very strange difference in behaviour between a >>>>>>>> Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 >>>>>>>> zeppelin interpreter (compiled with Java 6 and sourced from maven >>>>>>>> central). >>>>>>>> >>>>>>>> The workflow loads data from Hive, applies a number of >>>>>>>> transformations (including quite a lot of shuffle operations) and then >>>>>>>> presents an enriched dataset. The code (an resulting DAGs) are >>>>>>>> identical in >>>>>>>> each case. >>>>>>>> >>>>>>>> The following particularities are noted: >>>>>>>> Importing the HiveRDD and caching it yields identical results on >>>>>>>> both platforms. >>>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset size >>>>>>>> per partition (excepting empty partitions). >>>>>>>> >>>>>>>> Writing shuffles shows this much more significant result: >>>>>>>> >>>>>>>> Zeppelin: >>>>>>>> *Total Time Across All Tasks: * 2,6 min >>>>>>>> *Input Size / Records: * 2.4 GB / 7314771 >>>>>>>> *Shuffle Write: * 673.5 MB / 7314771 >>>>>>>> >>>>>>>> vs >>>>>>>> >>>>>>>> Spark-shell: >>>>>>>> *Total Time Across All Tasks: * 28 min >>>>>>>> *Input Size / Records: * 3.6 GB / 7314771 >>>>>>>> *Shuffle Write: * 9.0 GB / 7314771 >>>>>>>> >>>>>>>> This is one of the early stages, which reads from a cached >>>>>>>> partition and then feeds into a join-stage. The latter stages show >>>>>>>> similar >>>>>>>> behaviour in producing excessive shuffle spills. >>>>>>>> >>>>>>>> Quite often the excessive shuffle volume will lead to massive >>>>>>>> shuffle spills which ultimately kill not only performance, but the >>>>>>>> actual >>>>>>>> executors as well. >>>>>>>> >>>>>>>> I have examined the Environment tab in the SParkUI and identified >>>>>>>> no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) >>>>>>>> scheduling mode. I fail to see how this would impact shuffle writes in >>>>>>>> such >>>>>>>> a drastic way, since it should be on the inter-job level, while this >>>>>>>> happens at the inter-stage level. >>>>>>>> >>>>>>>> I was somewhat supicious of maybe compression or serialization >>>>>>>> playing a role, but the SparkConf points to those being set to the >>>>>>>> default. >>>>>>>> Also Zeppelin's interpreter adds no relevant additional default >>>>>>>> parameters. >>>>>>>> I performed a diff between rc4 (which was later released) and 1.4.0 >>>>>>>> and as expected there were no differences, besides a single class >>>>>>>> (remarkably, a shuffle-relevant class: >>>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) >>>>>>>> differing in its binary representation due to being compiled with Java >>>>>>>> 7 >>>>>>>> instead of Java 6. The decompiled sources of those two are again >>>>>>>> identical. >>>>>>>> >>>>>>>> I may attempt as a next step to simply replace that file in the >>>>>>>> packaged jar, to ascertain that indeed there is no difference between >>>>>>>> the >>>>>>>> two versions, but would consider this to be a major bg, if a simple >>>>>>>> compiler change leads to this kind of issue. >>>>>>>> >>>>>>>> I a also open for any other ideas, in particular to verify that the >>>>>>>> same compression/serialization is indeed happening, and regarding ways >>>>>>>> to >>>>>>>> determin what exactly is written into these shuffles -- currently I >>>>>>>> only >>>>>>>> know that the tuples are bigger (or smaller) than they ought to be. The >>>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus the >>>>>>>> suspicion is, that there is an issue with the process launched from >>>>>>>> spark-shell. I will also attempt to build a spark job and spark-submit >>>>>>>> it >>>>>>>> using different spark-binaries to further explore the issue. >>>>>>>> >>>>>>>> Best Regards, >>>>>>>> >>>>>>>> Rick Moritz >>>>>>>> >>>>>>>> PS: I already tried to send this mail yesterday, but it never made >>>>>>>> it onto the list, as far as I can tell -- I apologize should anyone >>>>>>>> receive >>>>>>>> this as a second copy. >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >