Ok, that might be possible , to confirm that you can explicitly specify the
serializer in both cases (by setting this spark.serializer i guess). So
then you can be sure that same serializers are used and may be then do an
analysis.

Best,
Kartik

On Mon, Sep 28, 2015 at 11:38 AM, Rick Moritz <rah...@gmail.com> wrote:

> Hi Kartik,
>
> Thanks for the input!
>
> Sadly, that's not it - I'm using YARN - the configuration looks identical,
> and the nodes/memory/cores are deployed identically and exactly as
> specified.
>
> My current hunch, is that for some reason different serializers are used
> in each case, but I can find no documentation on why that could be the
> case, and the configuration isn't indicative of that either.
> Nonetheless, the symptom of different shuffle volume for same shuffle
> number of tuples could well point to that as source of my issue.
> In fact, a colleague pointed out that HIS (Cloudera) installation was
> defaulting to kryo for the spark-shell, which had an impact for some jobs.
> I couldn't find the document he was referring to as a source of this
> information, but the behavior sounds plausible at least.
>
> Best,
>
> Rick
>
>
> On Mon, Sep 28, 2015 at 8:24 PM, Kartik Mathur <kar...@bluedata.com>
> wrote:
>
>> Hey Rick ,
>> Not sure on this but similar situation happened with me, when starting
>> spark-shell it was starting a new cluster instead of using the existing
>> cluster and this new cluster was a single node cluster , that's why jobs
>> were taking forever to complete from spark-shell and were running much
>> faster using submit (which reads conf correctly) or zeppelin for that
>> matter.
>>
>> Thanks,
>> Kartik
>>
>> On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz <rah...@gmail.com> wrote:
>>
>>> I've finally been able to pick this up again, after upgrading to Spark
>>> 1.4.1, because my code used the HiveContext, which runs fine in the REPL
>>> (be it via Zeppelin or the shell) but won't work with spark-submit.
>>> With 1.4.1, I hav actually managed to get a result with the Spark shell,
>>> but after
>>> 3847,802237 seconds and in particular the last stage took 1320,672
>>> seconds.
>>> This was after I used coalesce to balance the workload initiall, since a
>>> Hive filter I applied normally would make for a skewed distribution of the
>>> data onto the nodes.
>>> Nonetheless, the same code (even withouth the coalesce) would work much
>>> faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
>>> job, the run time was just a tenth at
>>> 446,657534 seconds for the entire job and notably 38,961 seconds for the
>>> final stage.
>>>
>>> Again, there is a huge difference in the amount of data that gets
>>> shuffled/spilled (which leads to much earlier OOM-conditions), when using
>>> spark-shell.
>>> What could be the reason for this different behaviour using very similar
>>> configurations and identical data, machines and code (identical DAGs and
>>> sources) and identical spark binaries? Why would code launched from
>>> spark-shell generate more shuffled data for the same number of shuffled
>>> tuples?
>>>
>>> An analysis would be much appreciated.
>>>
>>> Best,
>>>
>>> Rick
>>>
>>> On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz <rah...@gmail.com> wrote:
>>>
>>>> oops, forgot to reply-all on this thread.
>>>>
>>>> ---------- Forwarded message ----------
>>>> From: Rick Moritz <rah...@gmail.com>
>>>> Date: Wed, Aug 19, 2015 at 2:46 PM
>>>> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
>>>> Spark-shell
>>>> To: Igor Berman <igor.ber...@gmail.com>
>>>>
>>>>
>>>> Those values are not explicitely set, and attempting to read their
>>>> values results in 'java.util.NoSuchElementException:
>>>> spark.shuffle.spill.compress'.
>>>> What I mean by the volume per element being larger is illustrated in my
>>>> original post: for each case the number of elements is identical, but the
>>>> volume of data required to obtain/manage these elements is many times
>>>> greater.
>>>>
>>>> The only difference used to be that Zeppelin had FAIR scheduling over
>>>> FIFO scheduling for spark-shell. I just verified that spark-shell with FAIR
>>>> scheduling makes no difference. The only other difference in the
>>>> environment lies in some class-path variables which should only affect
>>>> method availability, not actual usage.
>>>>
>>>> Another fact to note: Spark assembly (1.4.0-rc4) was built with
>>>> provided hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6
>>>> -Dhadoop.version=2.6.0 -Phadoop-provided -Phive -Phive-thriftserver
>>>> -Psparkr -DskipTests clean package) for 2.6.0 from Hortonworks, while
>>>> Zeppelin was built with dependencies against 2.6.0 from Maven central.
>>>>
>>>> On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman <igor.ber...@gmail.com>
>>>> wrote:
>>>>
>>>>> so what your case for version differences?
>>>>> what do u mean by  "in spark-shell the volume per element is much
>>>>> larger"
>>>>> can you verify that configuration in spark ui (under Environment tab
>>>>> is same).
>>>>> if you suspect compression than check following properties:
>>>>> spark.shuffle.compress
>>>>> spark.shuffle.spill.compress
>>>>> spark.io.compression.codec
>>>>> spark.rdd.compress
>>>>>
>>>>>
>>>>>
>>>>> On 19 August 2015 at 15:03, Rick Moritz <rah...@gmail.com> wrote:
>>>>>
>>>>>> Number of partitions and even size look relatively similar - except
>>>>>> in spark-shell the volume per element is much larger, especially in later
>>>>>> stages. That's when shuffles start to spill. Zeppelin creates almost no
>>>>>> spills at all. The number of elements per partition are the same for both
>>>>>> setups, but with very different data volume in/out. Almost as though
>>>>>> compression was used in one case, and not in another, or as though
>>>>>> shuffling is somehow less specific, and more nodes get data that they
>>>>>> ultimately don't process at all. The same shuffling algorithm appears to 
>>>>>> be
>>>>>> at work in each case, if the partitioning of the number of elements is
>>>>>> anything to go by.
>>>>>>
>>>>>> On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman <igor.ber...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> i would compare spark ui metrics for both cases and see any
>>>>>>> differences(number of partitions, number of spills etc)
>>>>>>> why can't you make repl to be consistent with zepellin spark
>>>>>>> version?  might be rc has issues...
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 19 August 2015 at 14:42, Rick Moritz <rah...@gmail.com> wrote:
>>>>>>>
>>>>>>>> No, the setup is one driver with 32g of memory, and three executors
>>>>>>>> each with 8g of memory in both cases. No core-number has been 
>>>>>>>> specified,
>>>>>>>> thus it should default to single-core (though I've seen the yarn-owned 
>>>>>>>> jvms
>>>>>>>> wrapping the executors take up to 3 cores in top). That is, unless, as 
>>>>>>>> I
>>>>>>>> suggested, there are different defaults for the two means of job 
>>>>>>>> submission
>>>>>>>> that come into play in a non-transparent fashion (i.e. not visible in
>>>>>>>> SparkConf).
>>>>>>>>
>>>>>>>> On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman <igor.ber...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> any differences in number of cores, memory settings for executors?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 19 August 2015 at 09:49, Rick Moritz <rah...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Dear list,
>>>>>>>>>>
>>>>>>>>>> I am observing a very strange difference in behaviour between a
>>>>>>>>>> Spark 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0
>>>>>>>>>> zeppelin interpreter (compiled with Java 6 and sourced from maven 
>>>>>>>>>> central).
>>>>>>>>>>
>>>>>>>>>> The workflow loads data from Hive, applies a number of
>>>>>>>>>> transformations (including quite a lot of shuffle operations) and 
>>>>>>>>>> then
>>>>>>>>>> presents an enriched dataset. The code (an resulting DAGs) are 
>>>>>>>>>> identical in
>>>>>>>>>> each case.
>>>>>>>>>>
>>>>>>>>>> The following particularities are noted:
>>>>>>>>>> Importing the HiveRDD and caching it yields identical results on
>>>>>>>>>> both platforms.
>>>>>>>>>> Applying case classes, leads to a 2-2.5MB increase in dataset
>>>>>>>>>> size per partition (excepting empty partitions).
>>>>>>>>>>
>>>>>>>>>> Writing shuffles shows this much more significant result:
>>>>>>>>>>
>>>>>>>>>> Zeppelin:
>>>>>>>>>> *Total Time Across All Tasks: * 2,6 min
>>>>>>>>>> *Input Size / Records: * 2.4 GB / 7314771
>>>>>>>>>> *Shuffle Write: * 673.5 MB / 7314771
>>>>>>>>>>
>>>>>>>>>> vs
>>>>>>>>>>
>>>>>>>>>> Spark-shell:
>>>>>>>>>> *Total Time Across All Tasks: * 28 min
>>>>>>>>>> *Input Size / Records: * 3.6 GB / 7314771
>>>>>>>>>> *Shuffle Write: * 9.0 GB / 7314771
>>>>>>>>>>
>>>>>>>>>> This is one of the early stages, which reads from a cached
>>>>>>>>>> partition and then feeds into a join-stage. The latter stages show 
>>>>>>>>>> similar
>>>>>>>>>> behaviour in producing excessive shuffle spills.
>>>>>>>>>>
>>>>>>>>>> Quite often the excessive shuffle volume will lead to massive
>>>>>>>>>> shuffle spills which ultimately kill not only performance, but the 
>>>>>>>>>> actual
>>>>>>>>>> executors as well.
>>>>>>>>>>
>>>>>>>>>> I have examined the Environment tab in the SParkUI and identified
>>>>>>>>>> no notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell)
>>>>>>>>>> scheduling mode. I fail to see how this would impact shuffle writes 
>>>>>>>>>> in such
>>>>>>>>>> a drastic way, since it should be on the inter-job level, while this
>>>>>>>>>> happens at the inter-stage level.
>>>>>>>>>>
>>>>>>>>>> I was somewhat supicious of maybe compression or serialization
>>>>>>>>>> playing a role, but the SparkConf points to those being set to the 
>>>>>>>>>> default.
>>>>>>>>>> Also Zeppelin's interpreter adds no relevant additional default 
>>>>>>>>>> parameters.
>>>>>>>>>> I performed a diff between rc4 (which was later released) and
>>>>>>>>>> 1.4.0 and as expected there were no differences, besides a single 
>>>>>>>>>> class
>>>>>>>>>> (remarkably, a shuffle-relevant class:
>>>>>>>>>> /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
>>>>>>>>>> differing in its binary representation due to being compiled with 
>>>>>>>>>> Java 7
>>>>>>>>>> instead of Java 6. The decompiled sources of those two are again 
>>>>>>>>>> identical.
>>>>>>>>>>
>>>>>>>>>> I may attempt as a next step to simply replace that file in the
>>>>>>>>>> packaged jar, to ascertain that indeed there is no difference 
>>>>>>>>>> between the
>>>>>>>>>> two versions, but would consider this to be a major bg, if a simple
>>>>>>>>>> compiler change leads to this kind of issue.
>>>>>>>>>>
>>>>>>>>>> I a also open for any other ideas, in particular to verify that
>>>>>>>>>> the same compression/serialization is indeed happening, and 
>>>>>>>>>> regarding ways
>>>>>>>>>> to determin what exactly is written into these shuffles -- currently 
>>>>>>>>>> I only
>>>>>>>>>> know that the tuples are bigger (or smaller) than they ought to be. 
>>>>>>>>>> The
>>>>>>>>>> Zeppelin-obtained results do appear to be consistent at least, thus 
>>>>>>>>>> the
>>>>>>>>>> suspicion is, that there is an issue with the process launched from
>>>>>>>>>> spark-shell. I will also attempt to build a spark job and 
>>>>>>>>>> spark-submit it
>>>>>>>>>> using different spark-binaries to further explore the issue.
>>>>>>>>>>
>>>>>>>>>> Best Regards,
>>>>>>>>>>
>>>>>>>>>> Rick Moritz
>>>>>>>>>>
>>>>>>>>>> PS: I already tried to send this mail yesterday, but it never
>>>>>>>>>> made it onto the list, as far as I can tell -- I apologize should 
>>>>>>>>>> anyone
>>>>>>>>>> receive this as a second copy.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to