I've finally been able to pick this up again, after upgrading to Spark 1.4.1, because my code used the HiveContext, which runs fine in the REPL (be it via Zeppelin or the shell) but won't work with spark-submit. With 1.4.1, I hav actually managed to get a result with the Spark shell, but after 3847,802237 seconds and in particular the last stage took 1320,672 seconds. This was after I used coalesce to balance the workload initiall, since a Hive filter I applied normally would make for a skewed distribution of the data onto the nodes. Nonetheless, the same code (even withouth the coalesce) would work much faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit job, the run time was just a tenth at 446,657534 seconds for the entire job and notably 38,961 seconds for the final stage.
Again, there is a huge difference in the amount of data that gets shuffled/spilled (which leads to much earlier OOM-conditions), when using spark-shell. What could be the reason for this different behaviour using very similar configurations and identical data, machines and code (identical DAGs and sources) and identical spark binaries? Why would code launched from spark-shell generate more shuffled data for the same number of shuffled tuples? An analysis would be much appreciated. Best, Rick On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote: > oops, forgot to reply-all on this thread. > > ---------- Forwarded message ---------- > From: Rick Moritz <rah...@gmail.com> > Date: Wed, Aug 19, 2015 at 2:46 PM > Subject: Re: Strange shuffle behaviour difference between Zeppelin and > Spark-shell > To: Igor Berman <igor.ber...@gmail.com> > > > Those values are not explicitely set, and attempting to read their values > results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'. > What I mean by the volume per element being larger is illustrated in my > original post: for each case the number of elements is identical, but the > volume of data required to obtain/manage these elements is many times > greater. > > The only difference used to be that Zeppelin had FAIR scheduling over FIFO > scheduling for spark-shell. I just verified that spark-shell with FAIR > scheduling makes no difference. The only other difference in the > environment lies in some class-path variables which should only affect > method availability, not actual usage. > > Another fact to note: Spark assembly (1.4.0-rc4) was built with provided > hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 > -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean > package) for 2.6.0 from Hortonworks, while Zeppelin was built with > dependencies against 2.6.0 from Maven central. > > On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com> > wrote: > >> so what your case for version differences? >> what do u mean by "in spark-shell the volume per element is much larger" >> can you verify that configuration in spark ui (under Environment tab is >> same). >> if you suspect compression than check following properties: >> spark.shuffle.compress >> spark.shuffle.spill.compress >> spark.io.compression.codec >> spark.rdd.compress >> >> >> >> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote: >> >>> Number of partitions and even size look relatively similar - except in >>> spark-shell the volume per element is much larger, especially in later >>> stages. That's when shuffles start to spill. Zeppelin creates almost no >>> spills at all. The number of elements per partition are the same for both >>> setups, but with very different data volume in/out. Almost as though >>> compression was used in one case, and not in another, or as though >>> shuffling is somehow less specific, and more nodes get data that they >>> ultimately don't process at all. The same shuffling algorithm appears to be >>> at work in each case, if the partitioning of the number of elements is >>> anything to go by. >>> >>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com> >>> wrote: >>> >>>> i would compare spark ui metrics for both cases and see any >>>> differences(number of partitions, number of spills etc) >>>> why can't you make repl to be consistent with zepellin spark version? >>>> might be rc has issues... >>>> >>>> >>>> >>>> >>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote: >>>> >>>>> No, the setup is one driver with 32g of memory, and three executors >>>>> each with 8g of memory in both cases. No core-number has been specified, >>>>> thus it should default to single-core (though I've seen the yarn-owned >>>>> jvms >>>>> wrapping the executors take up to 3 cores in top). That is, unless, as I >>>>> suggested, there are different defaults for the two means of job >>>>> submission >>>>> that come into play in a non-transparent fashion (i.e. not visible in >>>>> SparkConf). >>>>> >>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <igor.ber...@gmail.com> >>>>> wrote: >>>>> >>>>>> any differences in number of cores, memory settings for executors? >>>>>> >>>>>> >>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote: >>>>>> >>>>>>> Dear list, >>>>>>> >>>>>>> I am observing a very strange difference in behaviour between a >>>>>>> Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 >>>>>>> zeppelin interpreter (compiled with Java 6 and sourced from maven >>>>>>> central). >>>>>>> >>>>>>> The workflow loads data from Hive, applies a number of >>>>>>> transformations (including quite a lot of shuffle operations) and then >>>>>>> presents an enriched dataset. The code (an resulting DAGs) are >>>>>>> identical in >>>>>>> each case. >>>>>>> >>>>>>> The following particularities are noted: >>>>>>> Importing the HiveRDD and caching it yields identical results on >>>>>>> both platforms. >>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset size >>>>>>> per partition (excepting empty partitions). >>>>>>> >>>>>>> Writing shuffles shows this much more significant result: >>>>>>> >>>>>>> Zeppelin: >>>>>>> *Total Time Across All Tasks: * 2,6 min >>>>>>> *Input Size / Records: * 2.4 GB / 7314771 >>>>>>> *Shuffle Write: * 673.5 MB / 7314771 >>>>>>> >>>>>>> vs >>>>>>> >>>>>>> Spark-shell: >>>>>>> *Total Time Across All Tasks: * 28 min >>>>>>> *Input Size / Records: * 3.6 GB / 7314771 >>>>>>> *Shuffle Write: * 9.0 GB / 7314771 >>>>>>> >>>>>>> This is one of the early stages, which reads from a cached partition >>>>>>> and then feeds into a join-stage. The latter stages show similar >>>>>>> behaviour >>>>>>> in producing excessive shuffle spills. >>>>>>> >>>>>>> Quite often the excessive shuffle volume will lead to massive >>>>>>> shuffle spills which ultimately kill not only performance, but the >>>>>>> actual >>>>>>> executors as well. >>>>>>> >>>>>>> I have examined the Environment tab in the SParkUI and identified no >>>>>>> notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) >>>>>>> scheduling >>>>>>> mode. I fail to see how this would impact shuffle writes in such a >>>>>>> drastic >>>>>>> way, since it should be on the inter-job level, while this happens at >>>>>>> the >>>>>>> inter-stage level. >>>>>>> >>>>>>> I was somewhat supicious of maybe compression or serialization >>>>>>> playing a role, but the SparkConf points to those being set to the >>>>>>> default. >>>>>>> Also Zeppelin's interpreter adds no relevant additional default >>>>>>> parameters. >>>>>>> I performed a diff between rc4 (which was later released) and 1.4.0 >>>>>>> and as expected there were no differences, besides a single class >>>>>>> (remarkably, a shuffle-relevant class: >>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) >>>>>>> differing in its binary representation due to being compiled with Java 7 >>>>>>> instead of Java 6. The decompiled sources of those two are again >>>>>>> identical. >>>>>>> >>>>>>> I may attempt as a next step to simply replace that file in the >>>>>>> packaged jar, to ascertain that indeed there is no difference between >>>>>>> the >>>>>>> two versions, but would consider this to be a major bg, if a simple >>>>>>> compiler change leads to this kind of issue. >>>>>>> >>>>>>> I a also open for any other ideas, in particular to verify that the >>>>>>> same compression/serialization is indeed happening, and regarding ways >>>>>>> to >>>>>>> determin what exactly is written into these shuffles -- currently I only >>>>>>> know that the tuples are bigger (or smaller) than they ought to be. The >>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus the >>>>>>> suspicion is, that there is an issue with the process launched from >>>>>>> spark-shell. I will also attempt to build a spark job and spark-submit >>>>>>> it >>>>>>> using different spark-binaries to further explore the issue. >>>>>>> >>>>>>> Best Regards, >>>>>>> >>>>>>> Rick Moritz >>>>>>> >>>>>>> PS: I already tried to send this mail yesterday, but it never made >>>>>>> it onto the list, as far as I can tell -- I apologize should anyone >>>>>>> receive >>>>>>> this as a second copy. >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > >