Ok, that might be possible , to confirm that you can explicitly specify the serializer in both cases (by setting this spark.serializer i guess). So then you can be sure that same serializers are used and may be then do an analysis.
Best, Kartik On Mon, Sep 28, 2015 at 11:38 AM, Rick Moritz <rah...@gmail.com> wrote: > Hi Kartik, > > Thanks for the input! > > Sadly, that's not it - I'm using YARN - the configuration looks identical, > and the nodes/memory/cores are deployed identically and exactly as > specified. > > My current hunch, is that for some reason different serializers are used > in each case, but I can find no documentation on why that could be the > case, and the configuration isn't indicative of that either. > Nonetheless, the symptom of different shuffle volume for same shuffle > number of tuples could well point to that as source of my issue. > In fact, a colleague pointed out that HIS (Cloudera) installation was > defaulting to kryo for the spark-shell, which had an impact for some jobs. > I couldn't find the document he was referring to as a source of this > information, but the behavior sounds plausible at least. > > Best, > > Rick > > > On Mon, Sep 28, 2015 at 8:24 PM, Kartik Mathur <kar...@bluedata.com> > wrote: > >> Hey Rick , >> Not sure on this but similar situation happened with me, when starting >> spark-shell it was starting a new cluster instead of using the existing >> cluster and this new cluster was a single node cluster , that's why jobs >> were taking forever to complete from spark-shell and were running much >> faster using submit (which reads conf correctly) or zeppelin for that >> matter. >> >> Thanks, >> Kartik >> >> On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz <rah...@gmail.com> wrote: >> >>> I've finally been able to pick this up again, after upgrading to Spark >>> 1.4.1, because my code used the HiveContext, which runs fine in the REPL >>> (be it via Zeppelin or the shell) but won't work with spark-submit. >>> With 1.4.1, I hav actually managed to get a result with the Spark shell, >>> but after >>> 3847,802237 seconds and in particular the last stage took 1320,672 >>> seconds. >>> This was after I used coalesce to balance the workload initiall, since a >>> Hive filter I applied normally would make for a skewed distribution of the >>> data onto the nodes. >>> Nonetheless, the same code (even withouth the coalesce) would work much >>> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit >>> job, the run time was just a tenth at >>> 446,657534 seconds for the entire job and notably 38,961 seconds for the >>> final stage. >>> >>> Again, there is a huge difference in the amount of data that gets >>> shuffled/spilled (which leads to much earlier OOM-conditions), when using >>> spark-shell. >>> What could be the reason for this different behaviour using very similar >>> configurations and identical data, machines and code (identical DAGs and >>> sources) and identical spark binaries? Why would code launched from >>> spark-shell generate more shuffled data for the same number of shuffled >>> tuples? >>> >>> An analysis would be much appreciated. >>> >>> Best, >>> >>> Rick >>> >>> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote: >>> >>>> oops, forgot to reply-all on this thread. >>>> >>>> ---------- Forwarded message ---------- >>>> From: Rick Moritz <rah...@gmail.com> >>>> Date: Wed, Aug 19, 2015 at 2:46 PM >>>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and >>>> Spark-shell >>>> To: Igor Berman <igor.ber...@gmail.com> >>>> >>>> >>>> Those values are not explicitely set, and attempting to read their >>>> values results in 'java.util.NoSuchElementException: >>>> spark.shuffle.spill.compress'. >>>> What I mean by the volume per element being larger is illustrated in my >>>> original post: for each case the number of elements is identical, but the >>>> volume of data required to obtain/manage these elements is many times >>>> greater. >>>> >>>> The only difference used to be that Zeppelin had FAIR scheduling over >>>> FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR >>>> scheduling makes no difference. The only other difference in the >>>> environment lies in some class-path variables which should only affect >>>> method availability, not actual usage. >>>> >>>> Another fact to note: Spark assembly (1.4.0-rc4) was built with >>>> provided hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 >>>> -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver >>>> -Psparkr -DskipTests clean package) for 2.6.0 from Hortonworks, while >>>> Zeppelin was built with dependencies against 2.6.0 from Maven central. >>>> >>>> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com> >>>> wrote: >>>> >>>>> so what your case for version differences? >>>>> what do u mean by "in spark-shell the volume per element is much >>>>> larger" >>>>> can you verify that configuration in spark ui (under Environment tab >>>>> is same). >>>>> if you suspect compression than check following properties: >>>>> spark.shuffle.compress >>>>> spark.shuffle.spill.compress >>>>> spark.io.compression.codec >>>>> spark.rdd.compress >>>>> >>>>> >>>>> >>>>> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote: >>>>> >>>>>> Number of partitions and even size look relatively similar - except >>>>>> in spark-shell the volume per element is much larger, especially in later >>>>>> stages. That's when shuffles start to spill. Zeppelin creates almost no >>>>>> spills at all. The number of elements per partition are the same for both >>>>>> setups, but with very different data volume in/out. Almost as though >>>>>> compression was used in one case, and not in another, or as though >>>>>> shuffling is somehow less specific, and more nodes get data that they >>>>>> ultimately don't process at all. The same shuffling algorithm appears to >>>>>> be >>>>>> at work in each case, if the partitioning of the number of elements is >>>>>> anything to go by. >>>>>> >>>>>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> i would compare spark ui metrics for both cases and see any >>>>>>> differences(number of partitions, number of spills etc) >>>>>>> why can't you make repl to be consistent with zepellin spark >>>>>>> version? might be rc has issues... >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote: >>>>>>> >>>>>>>> No, the setup is one driver with 32g of memory, and three executors >>>>>>>> each with 8g of memory in both cases. No core-number has been >>>>>>>> specified, >>>>>>>> thus it should default to single-core (though I've seen the yarn-owned >>>>>>>> jvms >>>>>>>> wrapping the executors take up to 3 cores in top). That is, unless, as >>>>>>>> I >>>>>>>> suggested, there are different defaults for the two means of job >>>>>>>> submission >>>>>>>> that come into play in a non-transparent fashion (i.e. not visible in >>>>>>>> SparkConf). >>>>>>>> >>>>>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <igor.ber...@gmail.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> any differences in number of cores, memory settings for executors? >>>>>>>>> >>>>>>>>> >>>>>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Dear list, >>>>>>>>>> >>>>>>>>>> I am observing a very strange difference in behaviour between a >>>>>>>>>> Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 >>>>>>>>>> zeppelin interpreter (compiled with Java 6 and sourced from maven >>>>>>>>>> central). >>>>>>>>>> >>>>>>>>>> The workflow loads data from Hive, applies a number of >>>>>>>>>> transformations (including quite a lot of shuffle operations) and >>>>>>>>>> then >>>>>>>>>> presents an enriched dataset. The code (an resulting DAGs) are >>>>>>>>>> identical in >>>>>>>>>> each case. >>>>>>>>>> >>>>>>>>>> The following particularities are noted: >>>>>>>>>> Importing the HiveRDD and caching it yields identical results on >>>>>>>>>> both platforms. >>>>>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset >>>>>>>>>> size per partition (excepting empty partitions). >>>>>>>>>> >>>>>>>>>> Writing shuffles shows this much more significant result: >>>>>>>>>> >>>>>>>>>> Zeppelin: >>>>>>>>>> *Total Time Across All Tasks: * 2,6 min >>>>>>>>>> *Input Size / Records: * 2.4 GB / 7314771 >>>>>>>>>> *Shuffle Write: * 673.5 MB / 7314771 >>>>>>>>>> >>>>>>>>>> vs >>>>>>>>>> >>>>>>>>>> Spark-shell: >>>>>>>>>> *Total Time Across All Tasks: * 28 min >>>>>>>>>> *Input Size / Records: * 3.6 GB / 7314771 >>>>>>>>>> *Shuffle Write: * 9.0 GB / 7314771 >>>>>>>>>> >>>>>>>>>> This is one of the early stages, which reads from a cached >>>>>>>>>> partition and then feeds into a join-stage. The latter stages show >>>>>>>>>> similar >>>>>>>>>> behaviour in producing excessive shuffle spills. >>>>>>>>>> >>>>>>>>>> Quite often the excessive shuffle volume will lead to massive >>>>>>>>>> shuffle spills which ultimately kill not only performance, but the >>>>>>>>>> actual >>>>>>>>>> executors as well. >>>>>>>>>> >>>>>>>>>> I have examined the Environment tab in the SParkUI and identified >>>>>>>>>> no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) >>>>>>>>>> scheduling mode. I fail to see how this would impact shuffle writes >>>>>>>>>> in such >>>>>>>>>> a drastic way, since it should be on the inter-job level, while this >>>>>>>>>> happens at the inter-stage level. >>>>>>>>>> >>>>>>>>>> I was somewhat supicious of maybe compression or serialization >>>>>>>>>> playing a role, but the SparkConf points to those being set to the >>>>>>>>>> default. >>>>>>>>>> Also Zeppelin's interpreter adds no relevant additional default >>>>>>>>>> parameters. >>>>>>>>>> I performed a diff between rc4 (which was later released) and >>>>>>>>>> 1.4.0 and as expected there were no differences, besides a single >>>>>>>>>> class >>>>>>>>>> (remarkably, a shuffle-relevant class: >>>>>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class ) >>>>>>>>>> differing in its binary representation due to being compiled with >>>>>>>>>> Java 7 >>>>>>>>>> instead of Java 6. The decompiled sources of those two are again >>>>>>>>>> identical. >>>>>>>>>> >>>>>>>>>> I may attempt as a next step to simply replace that file in the >>>>>>>>>> packaged jar, to ascertain that indeed there is no difference >>>>>>>>>> between the >>>>>>>>>> two versions, but would consider this to be a major bg, if a simple >>>>>>>>>> compiler change leads to this kind of issue. >>>>>>>>>> >>>>>>>>>> I a also open for any other ideas, in particular to verify that >>>>>>>>>> the same compression/serialization is indeed happening, and >>>>>>>>>> regarding ways >>>>>>>>>> to determin what exactly is written into these shuffles -- currently >>>>>>>>>> I only >>>>>>>>>> know that the tuples are bigger (or smaller) than they ought to be. >>>>>>>>>> The >>>>>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus >>>>>>>>>> the >>>>>>>>>> suspicion is, that there is an issue with the process launched from >>>>>>>>>> spark-shell. I will also attempt to build a spark job and >>>>>>>>>> spark-submit it >>>>>>>>>> using different spark-binaries to further explore the issue. >>>>>>>>>> >>>>>>>>>> Best Regards, >>>>>>>>>> >>>>>>>>>> Rick Moritz >>>>>>>>>> >>>>>>>>>> PS: I already tried to send this mail yesterday, but it never >>>>>>>>>> made it onto the list, as far as I can tell -- I apologize should >>>>>>>>>> anyone >>>>>>>>>> receive this as a second copy. >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >