@rxin With the fixes, I could run it fine on top of branch-1.0 On master when running using YARN I am getting another KryoException:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 247 in stage 52.0 failed 4 times, most recent failure: Lost task 247.3 in stage 52.0 (TID 10010, tblpmidn05adv-hdp.tdc.vzwcorp.com): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException Serialization trace: shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) OutLinkBlock does not extend Serializable...Also I did not see this failure before.. Was the fix tested on YARN ? @dbtsai did your assembly on YARN ran fine or you are still noticing these exceptions ? Thanks. Deb On Thu, Aug 14, 2014 at 5:48 PM, Reynold Xin <r...@databricks.com> wrote: > Here: https://github.com/apache/spark/pull/1948 > > > > On Thu, Aug 14, 2014 at 5:45 PM, Debasish Das <debasish.da...@gmail.com> > wrote: > >> Is there a fix that I can test ? I have the flows setup for both >> standalone and YARN runs... >> >> Thanks. >> Deb >> >> >> >> On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <r...@databricks.com> >> wrote: >> >>> Yes, I understand it might not work for custom serializer, but that is a >>> much less common path. >>> >>> Basically I want a quick fix for 1.1 release (which is coming up soon). >>> I would not be comfortable making big changes to class path late into the >>> release cycle. We can do that for 1.2. >>> >>> >>> >>> >>> >>> On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.den...@gmail.com> >>> wrote: >>> >>>> That should work, but would you also make these changes to the >>>> JavaSerializer? The API of these is the same so that you can select one or >>>> the other (or in theory a custom serializer)? This also wouldn't address >>>> the problem of shipping custom *serializers* (not kryo registrators) in >>>> user jars. >>>> >>>> On 14 August 2014 19:23, Reynold Xin <r...@databricks.com> wrote: >>>> >>>>> Graham, >>>>> >>>>> SparkEnv only creates a KryoSerializer, but as I understand that >>>>> serializer doesn't actually initializes the registrator since that is only >>>>> called when newKryo() is called when KryoSerializerInstance is >>>>> initialized. >>>>> >>>>> Basically I'm thinking a quick fix for 1.2: >>>>> >>>>> 1. Add a classLoader field to KryoSerializer; initialize new >>>>> KryoSerializerInstance with that class loader >>>>> >>>>> 2. Set that classLoader to the executor's class loader when Executor >>>>> is initialized. >>>>> >>>>> Then all deser calls should be using the executor's class loader. >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis < >>>>> graham.den...@gmail.com> wrote: >>>>> >>>>>> Hi Reynold, >>>>>> >>>>>> That would solve this specific issue, but you'd need to be careful >>>>>> that you never created a serialiser instance before the first task is >>>>>> received. Currently in Executor.TaskRunner.run a closure serialiser >>>>>> instance is created before any application jars are downloaded, but that >>>>>> could be moved. To me, this seems a little fragile. >>>>>> >>>>>> However there is a related issue where you can't ship a custom >>>>>> serialiser in an application jar because the serialiser is instantiated >>>>>> when the SparkEnv object is created, which is before any tasks are >>>>>> received >>>>>> by the executor. The above approach wouldn't help with this problem. >>>>>> Additionally, the YARN scheduler currently uses this approach of adding >>>>>> the application jar to the Executor classpath, so it would make things a >>>>>> bit more uniform. >>>>>> >>>>>> Cheers, >>>>>> Graham >>>>>> >>>>>> >>>>>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote: >>>>>> >>>>>>> Graham, >>>>>>> >>>>>>> Thanks for working on this. This is an important bug to fix. >>>>>>> >>>>>>> I don't have the whole context and obviously I haven't spent >>>>>>> nearly as much time on this as you have, but I'm wondering what if we >>>>>>> always pass the executor's ClassLoader to the Kryo serializer? Will that >>>>>>> solve this problem? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis < >>>>>>> graham.den...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Deb, >>>>>>>> >>>>>>>> The only alternative serialiser is the JavaSerialiser (the >>>>>>>> default). Theoretically Spark supports custom serialisers, but due to >>>>>>>> a >>>>>>>> related issue, custom serialisers currently can't live in application >>>>>>>> jars >>>>>>>> and must be available to all executors at launch. My PR fixes this >>>>>>>> issue >>>>>>>> as well, allowing custom serialisers to be shipped in application jars. >>>>>>>> >>>>>>>> Graham >>>>>>>> >>>>>>>> >>>>>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Sorry I just saw Graham's email after sending my previous email >>>>>>>>> about this bug... >>>>>>>>> >>>>>>>>> I have been seeing this same issue on our ALS runs last week but I >>>>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core >>>>>>>>> 1.0... >>>>>>>>> >>>>>>>>> What's the status of this PR ? Will this fix be back-ported to >>>>>>>>> 1.0.1 as we are running 1.0.1 stable standalone cluster ? >>>>>>>>> >>>>>>>>> Till the PR merges does it make sense to not use Kryo ? What are >>>>>>>>> the other recommended efficient serializers ? >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> Deb >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis < >>>>>>>>> graham.den...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I now have a complete pull request for this issue that I'd like >>>>>>>>>> to get >>>>>>>>>> reviewed and committed. The PR is available here: >>>>>>>>>> https://github.com/apache/spark/pull/1890 and includes a >>>>>>>>>> testcase for the >>>>>>>>>> issue I described. I've also submitted a related PR ( >>>>>>>>>> https://github.com/apache/spark/pull/1827) that causes >>>>>>>>>> exceptions raised >>>>>>>>>> while attempting to run the custom kryo registrator not to be >>>>>>>>>> swallowed. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Graham >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> > I've submitted a work-in-progress pull request for this issue >>>>>>>>>> that I'd >>>>>>>>>> > like feedback on. See >>>>>>>>>> https://github.com/apache/spark/pull/1890 . I've >>>>>>>>>> > also submitted a pull request for the related issue that the >>>>>>>>>> exceptions hit >>>>>>>>>> > when trying to use a custom kryo registrator are being >>>>>>>>>> swallowed: >>>>>>>>>> > https://github.com/apache/spark/pull/1827 >>>>>>>>>> > >>>>>>>>>> > The approach in my pull request is to get the Worker processes >>>>>>>>>> to download >>>>>>>>>> > the application jars and add them to the Executor class path at >>>>>>>>>> launch >>>>>>>>>> > time. There are a couple of things that still need to be done >>>>>>>>>> before this >>>>>>>>>> > can be merged: >>>>>>>>>> > 1. At the moment, the first time a task runs in the executor, >>>>>>>>>> the >>>>>>>>>> > application jars are downloaded again. My solution here would >>>>>>>>>> be to make >>>>>>>>>> > the executor not download any jars that already exist. >>>>>>>>>> Previously, the >>>>>>>>>> > driver & executor kept track of the timestamp of jar files and >>>>>>>>>> would >>>>>>>>>> > redownload 'updated' jars, however this never made sense as the >>>>>>>>>> previous >>>>>>>>>> > version of the updated jar may have already been loaded into >>>>>>>>>> the executor, >>>>>>>>>> > so the updated jar may have no effect. As my current pull >>>>>>>>>> request removes >>>>>>>>>> > the timestamp for jars, just checking whether the jar exists >>>>>>>>>> will allow us >>>>>>>>>> > to avoid downloading the jars again. >>>>>>>>>> > 2. Tests. :-) >>>>>>>>>> > >>>>>>>>>> > A side-benefit of my pull request is that you will be able to >>>>>>>>>> use custom >>>>>>>>>> > serialisers that are distributed in a user jar. Currently, the >>>>>>>>>> serialiser >>>>>>>>>> > instance is created in the Executor process before the first >>>>>>>>>> task is >>>>>>>>>> > received and therefore before any user jars are downloaded. As >>>>>>>>>> this PR >>>>>>>>>> > adds user jars to the Executor process at launch time, this >>>>>>>>>> won't be an >>>>>>>>>> > issue. >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> > >>>>>>>>>> >> See my comment on >>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for >>>>>>>>>> >> the full stacktrace, but it's in the >>>>>>>>>> BlockManager/BlockManagerWorker where >>>>>>>>>> >> it's trying to fulfil a "getBlock" request for another node. >>>>>>>>>> The objects >>>>>>>>>> >> that would be in the block haven't yet been serialised, and >>>>>>>>>> that then >>>>>>>>>> >> causes the deserialisation to happen on that thread. See >>>>>>>>>> >> MemoryStore.scala:102. >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> >>>>>>>>>> wrote: >>>>>>>>>> >> >>>>>>>>>> >>> I don't think it was a conscious design decision to not >>>>>>>>>> include the >>>>>>>>>> >>> application classes in the connection manager serializer. We >>>>>>>>>> should fix >>>>>>>>>> >>> that. Where is it deserializing data in that thread? >>>>>>>>>> >>> >>>>>>>>>> >>> 4 might make sense in the long run, but it adds a lot of >>>>>>>>>> complexity to >>>>>>>>>> >>> the code base (whole separate code base, task queue, >>>>>>>>>> blocking/non-blocking >>>>>>>>>> >>> logic within task threads) that can be error prone, so I >>>>>>>>>> think it is best >>>>>>>>>> >>> to stay away from that right now. >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis < >>>>>>>>>> graham.den...@gmail.com> >>>>>>>>>> >>> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>>> Hi Spark devs, >>>>>>>>>> >>>> >>>>>>>>>> >>>> I’ve posted an issue on JIRA ( >>>>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which >>>>>>>>>> occurs when >>>>>>>>>> >>>> using >>>>>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to >>>>>>>>>> register custom >>>>>>>>>> >>>> classes with Kryo. This is an insidious issue that >>>>>>>>>> >>>> non-deterministically >>>>>>>>>> >>>> causes Kryo to have different ID number => class name maps >>>>>>>>>> on different >>>>>>>>>> >>>> nodes, which then causes weird exceptions >>>>>>>>>> (ClassCastException, >>>>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at >>>>>>>>>> >>>> deserialisation >>>>>>>>>> >>>> time. I’ve created a reliable reproduction for the issue >>>>>>>>>> here: >>>>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation >>>>>>>>>> >>>> >>>>>>>>>> >>>> I’m happy to try and put a pull request together to try and >>>>>>>>>> address >>>>>>>>>> >>>> this, >>>>>>>>>> >>>> but it’s not obvious to me the right way to solve this and >>>>>>>>>> I’d like to >>>>>>>>>> >>>> get >>>>>>>>>> >>>> feedback / ideas on how to address this. >>>>>>>>>> >>>> >>>>>>>>>> >>>> The root cause of the problem is a "Failed to run >>>>>>>>>> >>>> spark.kryo.registrator” >>>>>>>>>> >>>> error which non-deterministically occurs in some executor >>>>>>>>>> processes >>>>>>>>>> >>>> during >>>>>>>>>> >>>> operation. My custom Kryo registrator is in the application >>>>>>>>>> jar, and >>>>>>>>>> >>>> it is >>>>>>>>>> >>>> accessible on the worker nodes. This is demonstrated by the >>>>>>>>>> fact that >>>>>>>>>> >>>> most >>>>>>>>>> >>>> of the time the custom kryo registrator is successfully run. >>>>>>>>>> >>>> >>>>>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation >>>>>>>>>> is happening >>>>>>>>>> >>>> most of the time on an “Executor task launch worker” thread, >>>>>>>>>> which has >>>>>>>>>> >>>> the >>>>>>>>>> >>>> thread's class loader set to contain the application jar. >>>>>>>>>> This happens >>>>>>>>>> >>>> in >>>>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and >>>>>>>>>> from what I can >>>>>>>>>> >>>> tell, it is only these threads that have access to the >>>>>>>>>> application jar >>>>>>>>>> >>>> (that contains the custom Kryo registrator). However, the >>>>>>>>>> >>>> ConnectionManager threads sometimes need to >>>>>>>>>> serialise/deserialise >>>>>>>>>> >>>> objects >>>>>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t >>>>>>>>>> previously been >>>>>>>>>> >>>> serialised. As the ConnectionManager threads don’t have the >>>>>>>>>> application >>>>>>>>>> >>>> jar available from their class loader, when it tries to look >>>>>>>>>> up the >>>>>>>>>> >>>> custom >>>>>>>>>> >>>> Kryo registrator, this fails. Spark then swallows this >>>>>>>>>> exception, which >>>>>>>>>> >>>> results in a different ID number —> class mapping for this >>>>>>>>>> kryo >>>>>>>>>> >>>> instance, >>>>>>>>>> >>>> and this then causes deserialisation errors later on a >>>>>>>>>> different node. >>>>>>>>>> >>>> >>>>>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that >>>>>>>>>> Spark >>>>>>>>>> >>>> probably >>>>>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo >>>>>>>>>> >>>> registrators. >>>>>>>>>> >>>> The user has explicitly specified this class, and if it >>>>>>>>>> >>>> deterministically >>>>>>>>>> >>>> can’t be found, then it may cause problems at serialisation / >>>>>>>>>> >>>> deserialisation time. If only sometimes it can’t be found >>>>>>>>>> (as in this >>>>>>>>>> >>>> case), then it leads to a data corruption issue later on. >>>>>>>>>> Either way, >>>>>>>>>> >>>> we’re better off dying due to the ClassNotFound exception >>>>>>>>>> earlier, than >>>>>>>>>> >>>> the >>>>>>>>>> >>>> weirder errors later on. >>>>>>>>>> >>>> >>>>>>>>>> >>>> I have some ideas on potential solutions to this issue, but >>>>>>>>>> I’m keen for >>>>>>>>>> >>>> experienced eyes to critique these approaches: >>>>>>>>>> >>>> >>>>>>>>>> >>>> 1. The simplest approach to fixing this would be to just >>>>>>>>>> make the >>>>>>>>>> >>>> application jar available to the connection manager threads, >>>>>>>>>> but I’m >>>>>>>>>> >>>> guessing it’s a design decision to isolate the application >>>>>>>>>> jar to just >>>>>>>>>> >>>> the >>>>>>>>>> >>>> executor task runner threads. Also, I don’t know if there >>>>>>>>>> are any other >>>>>>>>>> >>>> threads that might be interacting with kryo serialisation / >>>>>>>>>> >>>> deserialisation. >>>>>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the >>>>>>>>>> thread’s >>>>>>>>>> >>>> class >>>>>>>>>> >>>> loader to include the application jar, then restore the >>>>>>>>>> class loader >>>>>>>>>> >>>> after >>>>>>>>>> >>>> the kryo registrator has been run. I don’t know if this >>>>>>>>>> would have any >>>>>>>>>> >>>> other side-effects. >>>>>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner >>>>>>>>>> threads, >>>>>>>>>> >>>> rather than delaying serialisation until later, when it can >>>>>>>>>> be done >>>>>>>>>> >>>> only if >>>>>>>>>> >>>> needed. This approach would probably have negative >>>>>>>>>> performance >>>>>>>>>> >>>> consequences. >>>>>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation >>>>>>>>>> / >>>>>>>>>> >>>> deserialisation that has the application jar on the class >>>>>>>>>> path. >>>>>>>>>> >>>> Serialisation / deserialisation would be the only thing >>>>>>>>>> these threads >>>>>>>>>> >>>> do, >>>>>>>>>> >>>> and this would minimise conflicts / interactions between the >>>>>>>>>> application >>>>>>>>>> >>>> jar and other jars. >>>>>>>>>> >>>> >>>>>>>>>> >>>> #4 sounds like the best approach to me, but I think would >>>>>>>>>> require >>>>>>>>>> >>>> considerable knowledge of Spark internals, which is beyond >>>>>>>>>> me at >>>>>>>>>> >>>> present. >>>>>>>>>> >>>> Does anyone have any better (and ideally simpler) ideas? >>>>>>>>>> >>>> >>>>>>>>>> >>>> Cheers, >>>>>>>>>> >>>> >>>>>>>>>> >>>> Graham >>>>>>>>>> >>>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >> >>>>>>>>>> > >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >