Hey Rick ,
Not sure on this but similar situation happened with me, when starting
spark-shell it was starting a new cluster instead of using the existing
cluster and this new cluster was a single node cluster , that's why jobs
were taking forever to complete from spark-shell and were running much
faster using submit (which reads conf correctly) or zeppelin for that
matter.

Thanks,
Kartik

On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz <rah...@gmail.com> wrote:

> I've finally been able to pick this up again, after upgrading to Spark
> 1.4.1, because my code used the HiveContext, which runs fine in the REPL
> (be it via Zeppelin or the shell) but won't work with spark-submit.
> With 1.4.1, I hav actually managed to get a result with the Spark shell,
> but after
> 3847,802237 seconds and in particular the last stage took 1320,672 seconds.
> This was after I used coalesce to balance the workload initiall, since a
> Hive filter I applied normally would make for a skewed distribution of the
> data onto the nodes.
> Nonetheless, the same code (even withouth the coalesce) would work much
> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
> job, the run time was just a tenth at
> 446,657534 seconds for the entire job and notably 38,961 seconds for the
> final stage.
>
> Again, there is a huge difference in the amount of data that gets
> shuffled/spilled (which leads to much earlier OOM-conditions), when using
> spark-shell.
> What could be the reason for this different behaviour using very similar
> configurations and identical data, machines and code (identical DAGs and
> sources) and identical spark binaries? Why would code launched from
> spark-shell generate more shuffled data for the same number of shuffled
> tuples?
>
> An analysis would be much appreciated.
>
> Best,
>
> Rick
>
> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote:
>
>> oops, forgot to reply-all on this thread.
>>
>> ---------- Forwarded message ----------
>> From: Rick Moritz <rah...@gmail.com>
>> Date: Wed, Aug 19, 2015 at 2:46 PM
>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
>> Spark-shell
>> To: Igor Berman <igor.ber...@gmail.com>
>>
>>
>> Those values are not explicitely set, and attempting to read their values
>> results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'.
>> What I mean by the volume per element being larger is illustrated in my
>> original post: for each case the number of elements is identical, but the
>> volume of data required to obtain/manage these elements is many times
>> greater.
>>
>> The only difference used to be that Zeppelin had FAIR scheduling over
>> FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR
>> scheduling makes no difference. The only other difference in the
>> environment lies in some class-path variables which should only affect
>> method availability, not actual usage.
>>
>> Another fact to note: Spark assembly (1.4.0-rc4) was built with provided
>> hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0
>> -Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean
>> package) for 2.6.0 from Hortonworks, while Zeppelin was built with
>> dependencies against 2.6.0 from Maven central.
>>
>> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> so what your case for version differences?
>>> what do u mean by  "in spark-shell the volume per element is much
>>> larger"
>>> can you verify that configuration in spark ui (under Environment tab is
>>> same).
>>> if you suspect compression than check following properties:
>>> spark.shuffle.compress
>>> spark.shuffle.spill.compress
>>> spark.io.compression.codec
>>> spark.rdd.compress
>>>
>>>
>>>
>>> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote:
>>>
>>>> Number of partitions and even size look relatively similar - except in
>>>> spark-shell the volume per element is much larger, especially in later
>>>> stages. That's when shuffles start to spill. Zeppelin creates almost no
>>>> spills at all. The number of elements per partition are the same for both
>>>> setups, but with very different data volume in/out. Almost as though
>>>> compression was used in one case, and not in another, or as though
>>>> shuffling is somehow less specific, and more nodes get data that they
>>>> ultimately don't process at all. The same shuffling algorithm appears to be
>>>> at work in each case, if the partitioning of the number of elements is
>>>> anything to go by.
>>>>
>>>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com>
>>>> wrote:
>>>>
>>>>> i would compare spark ui metrics for both cases and see any
>>>>> differences(number of partitions, number of spills etc)
>>>>> why can't you make repl to be consistent with zepellin spark version?
>>>>>  might be rc has issues...
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote:
>>>>>
>>>>>> No, the setup is one driver with 32g of memory, and three executors
>>>>>> each with 8g of memory in both cases. No core-number has been specified,
>>>>>> thus it should default to single-core (though I've seen the yarn-owned 
>>>>>> jvms
>>>>>> wrapping the executors take up to 3 cores in top). That is, unless, as I
>>>>>> suggested, there are different defaults for the two means of job 
>>>>>> submission
>>>>>> that come into play in a non-transparent fashion (i.e. not visible in
>>>>>> SparkConf).
>>>>>>
>>>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <igor.ber...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> any differences in number of cores, memory settings for executors?
>>>>>>>
>>>>>>>
>>>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Dear list,
>>>>>>>>
>>>>>>>> I am observing a very strange difference in behaviour between a
>>>>>>>> Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0
>>>>>>>> zeppelin interpreter (compiled with Java 6 and sourced from maven 
>>>>>>>> central).
>>>>>>>>
>>>>>>>> The workflow loads data from Hive, applies a number of
>>>>>>>> transformations (including quite a lot of shuffle operations) and then
>>>>>>>> presents an enriched dataset. The code (an resulting DAGs) are 
>>>>>>>> identical in
>>>>>>>> each case.
>>>>>>>>
>>>>>>>> The following particularities are noted:
>>>>>>>> Importing the HiveRDD and caching it yields identical results on
>>>>>>>> both platforms.
>>>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset size
>>>>>>>> per partition (excepting empty partitions).
>>>>>>>>
>>>>>>>> Writing shuffles shows this much more significant result:
>>>>>>>>
>>>>>>>> Zeppelin:
>>>>>>>> *Total Time Across All Tasks: * 2,6 min
>>>>>>>> *Input Size / Records: * 2.4 GB / 7314771
>>>>>>>> *Shuffle Write: * 673.5 MB / 7314771
>>>>>>>>
>>>>>>>> vs
>>>>>>>>
>>>>>>>> Spark-shell:
>>>>>>>> *Total Time Across All Tasks: * 28 min
>>>>>>>> *Input Size / Records: * 3.6 GB / 7314771
>>>>>>>> *Shuffle Write: * 9.0 GB / 7314771
>>>>>>>>
>>>>>>>> This is one of the early stages, which reads from a cached
>>>>>>>> partition and then feeds into a join-stage. The latter stages show 
>>>>>>>> similar
>>>>>>>> behaviour in producing excessive shuffle spills.
>>>>>>>>
>>>>>>>> Quite often the excessive shuffle volume will lead to massive
>>>>>>>> shuffle spills which ultimately kill not only performance, but the 
>>>>>>>> actual
>>>>>>>> executors as well.
>>>>>>>>
>>>>>>>> I have examined the Environment tab in the SParkUI and identified
>>>>>>>> no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell)
>>>>>>>> scheduling mode. I fail to see how this would impact shuffle writes in 
>>>>>>>> such
>>>>>>>> a drastic way, since it should be on the inter-job level, while this
>>>>>>>> happens at the inter-stage level.
>>>>>>>>
>>>>>>>> I was somewhat supicious of maybe compression or serialization
>>>>>>>> playing a role, but the SparkConf points to those being set to the 
>>>>>>>> default.
>>>>>>>> Also Zeppelin's interpreter adds no relevant additional default 
>>>>>>>> parameters.
>>>>>>>> I performed a diff between rc4 (which was later released) and 1.4.0
>>>>>>>> and as expected there were no differences, besides a single class
>>>>>>>> (remarkably, a shuffle-relevant class:
>>>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
>>>>>>>> differing in its binary representation due to being compiled with Java 
>>>>>>>> 7
>>>>>>>> instead of Java 6. The decompiled sources of those two are again 
>>>>>>>> identical.
>>>>>>>>
>>>>>>>> I may attempt as a next step to simply replace that file in the
>>>>>>>> packaged jar, to ascertain that indeed there is no difference between 
>>>>>>>> the
>>>>>>>> two versions, but would consider this to be a major bg, if a simple
>>>>>>>> compiler change leads to this kind of issue.
>>>>>>>>
>>>>>>>> I a also open for any other ideas, in particular to verify that the
>>>>>>>> same compression/serialization is indeed happening, and regarding ways 
>>>>>>>> to
>>>>>>>> determin what exactly is written into these shuffles -- currently I 
>>>>>>>> only
>>>>>>>> know that the tuples are bigger (or smaller) than they ought to be. The
>>>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus the
>>>>>>>> suspicion is, that there is an issue with the process launched from
>>>>>>>> spark-shell. I will also attempt to build a spark job and spark-submit 
>>>>>>>> it
>>>>>>>> using different spark-binaries to further explore the issue.
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>>
>>>>>>>> Rick Moritz
>>>>>>>>
>>>>>>>> PS: I already tried to send this mail yesterday, but it never made
>>>>>>>> it onto the list, as far as I can tell -- I apologize should anyone 
>>>>>>>> receive
>>>>>>>> this as a second copy.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to