Here: https://github.com/apache/spark/pull/1948
On Thu, Aug 14, 2014 at 5:45 PM, Debasish Das <debasish.da...@gmail.com> wrote: > Is there a fix that I can test ? I have the flows setup for both > standalone and YARN runs... > > Thanks. > Deb > > > > On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <r...@databricks.com> wrote: > >> Yes, I understand it might not work for custom serializer, but that is a >> much less common path. >> >> Basically I want a quick fix for 1.1 release (which is coming up soon). I >> would not be comfortable making big changes to class path late into the >> release cycle. We can do that for 1.2. >> >> >> >> >> >> On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.den...@gmail.com> >> wrote: >> >>> That should work, but would you also make these changes to the >>> JavaSerializer? The API of these is the same so that you can select one or >>> the other (or in theory a custom serializer)? This also wouldn't address >>> the problem of shipping custom *serializers* (not kryo registrators) in >>> user jars. >>> >>> On 14 August 2014 19:23, Reynold Xin <r...@databricks.com> wrote: >>> >>>> Graham, >>>> >>>> SparkEnv only creates a KryoSerializer, but as I understand that >>>> serializer doesn't actually initializes the registrator since that is only >>>> called when newKryo() is called when KryoSerializerInstance is initialized. >>>> >>>> Basically I'm thinking a quick fix for 1.2: >>>> >>>> 1. Add a classLoader field to KryoSerializer; initialize new >>>> KryoSerializerInstance with that class loader >>>> >>>> 2. Set that classLoader to the executor's class loader when Executor >>>> is initialized. >>>> >>>> Then all deser calls should be using the executor's class loader. >>>> >>>> >>>> >>>> >>>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis < >>>> graham.den...@gmail.com> wrote: >>>> >>>>> Hi Reynold, >>>>> >>>>> That would solve this specific issue, but you'd need to be careful >>>>> that you never created a serialiser instance before the first task is >>>>> received. Currently in Executor.TaskRunner.run a closure serialiser >>>>> instance is created before any application jars are downloaded, but that >>>>> could be moved. To me, this seems a little fragile. >>>>> >>>>> However there is a related issue where you can't ship a custom >>>>> serialiser in an application jar because the serialiser is instantiated >>>>> when the SparkEnv object is created, which is before any tasks are >>>>> received >>>>> by the executor. The above approach wouldn't help with this problem. >>>>> Additionally, the YARN scheduler currently uses this approach of adding >>>>> the application jar to the Executor classpath, so it would make things a >>>>> bit more uniform. >>>>> >>>>> Cheers, >>>>> Graham >>>>> >>>>> >>>>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote: >>>>> >>>>>> Graham, >>>>>> >>>>>> Thanks for working on this. This is an important bug to fix. >>>>>> >>>>>> I don't have the whole context and obviously I haven't spent nearly >>>>>> as much time on this as you have, but I'm wondering what if we always >>>>>> pass >>>>>> the executor's ClassLoader to the Kryo serializer? Will that solve this >>>>>> problem? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis < >>>>>> graham.den...@gmail.com> wrote: >>>>>> >>>>>>> Hi Deb, >>>>>>> >>>>>>> The only alternative serialiser is the JavaSerialiser (the default). >>>>>>> Theoretically Spark supports custom serialisers, but due to a related >>>>>>> issue, custom serialisers currently can't live in application jars and >>>>>>> must >>>>>>> be available to all executors at launch. My PR fixes this issue as >>>>>>> well, >>>>>>> allowing custom serialisers to be shipped in application jars. >>>>>>> >>>>>>> Graham >>>>>>> >>>>>>> >>>>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Sorry I just saw Graham's email after sending my previous email >>>>>>>> about this bug... >>>>>>>> >>>>>>>> I have been seeing this same issue on our ALS runs last week but I >>>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core >>>>>>>> 1.0... >>>>>>>> >>>>>>>> What's the status of this PR ? Will this fix be back-ported to >>>>>>>> 1.0.1 as we are running 1.0.1 stable standalone cluster ? >>>>>>>> >>>>>>>> Till the PR merges does it make sense to not use Kryo ? What are >>>>>>>> the other recommended efficient serializers ? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> Deb >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis < >>>>>>>> graham.den...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I now have a complete pull request for this issue that I'd like to >>>>>>>>> get >>>>>>>>> reviewed and committed. The PR is available here: >>>>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase >>>>>>>>> for the >>>>>>>>> issue I described. I've also submitted a related PR ( >>>>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions >>>>>>>>> raised >>>>>>>>> while attempting to run the custom kryo registrator not to be >>>>>>>>> swallowed. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Graham >>>>>>>>> >>>>>>>>> >>>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> > I've submitted a work-in-progress pull request for this issue >>>>>>>>> that I'd >>>>>>>>> > like feedback on. See https://github.com/apache/spark/pull/1890 >>>>>>>>> . I've >>>>>>>>> > also submitted a pull request for the related issue that the >>>>>>>>> exceptions hit >>>>>>>>> > when trying to use a custom kryo registrator are being swallowed: >>>>>>>>> > https://github.com/apache/spark/pull/1827 >>>>>>>>> > >>>>>>>>> > The approach in my pull request is to get the Worker processes >>>>>>>>> to download >>>>>>>>> > the application jars and add them to the Executor class path at >>>>>>>>> launch >>>>>>>>> > time. There are a couple of things that still need to be done >>>>>>>>> before this >>>>>>>>> > can be merged: >>>>>>>>> > 1. At the moment, the first time a task runs in the executor, the >>>>>>>>> > application jars are downloaded again. My solution here would >>>>>>>>> be to make >>>>>>>>> > the executor not download any jars that already exist. >>>>>>>>> Previously, the >>>>>>>>> > driver & executor kept track of the timestamp of jar files and >>>>>>>>> would >>>>>>>>> > redownload 'updated' jars, however this never made sense as the >>>>>>>>> previous >>>>>>>>> > version of the updated jar may have already been loaded into the >>>>>>>>> executor, >>>>>>>>> > so the updated jar may have no effect. As my current pull >>>>>>>>> request removes >>>>>>>>> > the timestamp for jars, just checking whether the jar exists >>>>>>>>> will allow us >>>>>>>>> > to avoid downloading the jars again. >>>>>>>>> > 2. Tests. :-) >>>>>>>>> > >>>>>>>>> > A side-benefit of my pull request is that you will be able to >>>>>>>>> use custom >>>>>>>>> > serialisers that are distributed in a user jar. Currently, the >>>>>>>>> serialiser >>>>>>>>> > instance is created in the Executor process before the first >>>>>>>>> task is >>>>>>>>> > received and therefore before any user jars are downloaded. As >>>>>>>>> this PR >>>>>>>>> > adds user jars to the Executor process at launch time, this >>>>>>>>> won't be an >>>>>>>>> > issue. >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> > >>>>>>>>> >> See my comment on >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for >>>>>>>>> >> the full stacktrace, but it's in the >>>>>>>>> BlockManager/BlockManagerWorker where >>>>>>>>> >> it's trying to fulfil a "getBlock" request for another node. >>>>>>>>> The objects >>>>>>>>> >> that would be in the block haven't yet been serialised, and >>>>>>>>> that then >>>>>>>>> >> causes the deserialisation to happen on that thread. See >>>>>>>>> >> MemoryStore.scala:102. >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> >>>>>>>>> wrote: >>>>>>>>> >> >>>>>>>>> >>> I don't think it was a conscious design decision to not >>>>>>>>> include the >>>>>>>>> >>> application classes in the connection manager serializer. We >>>>>>>>> should fix >>>>>>>>> >>> that. Where is it deserializing data in that thread? >>>>>>>>> >>> >>>>>>>>> >>> 4 might make sense in the long run, but it adds a lot of >>>>>>>>> complexity to >>>>>>>>> >>> the code base (whole separate code base, task queue, >>>>>>>>> blocking/non-blocking >>>>>>>>> >>> logic within task threads) that can be error prone, so I think >>>>>>>>> it is best >>>>>>>>> >>> to stay away from that right now. >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis < >>>>>>>>> graham.den...@gmail.com> >>>>>>>>> >>> wrote: >>>>>>>>> >>> >>>>>>>>> >>>> Hi Spark devs, >>>>>>>>> >>>> >>>>>>>>> >>>> I’ve posted an issue on JIRA ( >>>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which >>>>>>>>> occurs when >>>>>>>>> >>>> using >>>>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register >>>>>>>>> custom >>>>>>>>> >>>> classes with Kryo. This is an insidious issue that >>>>>>>>> >>>> non-deterministically >>>>>>>>> >>>> causes Kryo to have different ID number => class name maps on >>>>>>>>> different >>>>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException, >>>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at >>>>>>>>> >>>> deserialisation >>>>>>>>> >>>> time. I’ve created a reliable reproduction for the issue >>>>>>>>> here: >>>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation >>>>>>>>> >>>> >>>>>>>>> >>>> I’m happy to try and put a pull request together to try and >>>>>>>>> address >>>>>>>>> >>>> this, >>>>>>>>> >>>> but it’s not obvious to me the right way to solve this and >>>>>>>>> I’d like to >>>>>>>>> >>>> get >>>>>>>>> >>>> feedback / ideas on how to address this. >>>>>>>>> >>>> >>>>>>>>> >>>> The root cause of the problem is a "Failed to run >>>>>>>>> >>>> spark.kryo.registrator” >>>>>>>>> >>>> error which non-deterministically occurs in some executor >>>>>>>>> processes >>>>>>>>> >>>> during >>>>>>>>> >>>> operation. My custom Kryo registrator is in the application >>>>>>>>> jar, and >>>>>>>>> >>>> it is >>>>>>>>> >>>> accessible on the worker nodes. This is demonstrated by the >>>>>>>>> fact that >>>>>>>>> >>>> most >>>>>>>>> >>>> of the time the custom kryo registrator is successfully run. >>>>>>>>> >>>> >>>>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation >>>>>>>>> is happening >>>>>>>>> >>>> most of the time on an “Executor task launch worker” thread, >>>>>>>>> which has >>>>>>>>> >>>> the >>>>>>>>> >>>> thread's class loader set to contain the application jar. >>>>>>>>> This happens >>>>>>>>> >>>> in >>>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from >>>>>>>>> what I can >>>>>>>>> >>>> tell, it is only these threads that have access to the >>>>>>>>> application jar >>>>>>>>> >>>> (that contains the custom Kryo registrator). However, the >>>>>>>>> >>>> ConnectionManager threads sometimes need to >>>>>>>>> serialise/deserialise >>>>>>>>> >>>> objects >>>>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t >>>>>>>>> previously been >>>>>>>>> >>>> serialised. As the ConnectionManager threads don’t have the >>>>>>>>> application >>>>>>>>> >>>> jar available from their class loader, when it tries to look >>>>>>>>> up the >>>>>>>>> >>>> custom >>>>>>>>> >>>> Kryo registrator, this fails. Spark then swallows this >>>>>>>>> exception, which >>>>>>>>> >>>> results in a different ID number —> class mapping for this >>>>>>>>> kryo >>>>>>>>> >>>> instance, >>>>>>>>> >>>> and this then causes deserialisation errors later on a >>>>>>>>> different node. >>>>>>>>> >>>> >>>>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that >>>>>>>>> Spark >>>>>>>>> >>>> probably >>>>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo >>>>>>>>> >>>> registrators. >>>>>>>>> >>>> The user has explicitly specified this class, and if it >>>>>>>>> >>>> deterministically >>>>>>>>> >>>> can’t be found, then it may cause problems at serialisation / >>>>>>>>> >>>> deserialisation time. If only sometimes it can’t be found >>>>>>>>> (as in this >>>>>>>>> >>>> case), then it leads to a data corruption issue later on. >>>>>>>>> Either way, >>>>>>>>> >>>> we’re better off dying due to the ClassNotFound exception >>>>>>>>> earlier, than >>>>>>>>> >>>> the >>>>>>>>> >>>> weirder errors later on. >>>>>>>>> >>>> >>>>>>>>> >>>> I have some ideas on potential solutions to this issue, but >>>>>>>>> I’m keen for >>>>>>>>> >>>> experienced eyes to critique these approaches: >>>>>>>>> >>>> >>>>>>>>> >>>> 1. The simplest approach to fixing this would be to just make >>>>>>>>> the >>>>>>>>> >>>> application jar available to the connection manager threads, >>>>>>>>> but I’m >>>>>>>>> >>>> guessing it’s a design decision to isolate the application >>>>>>>>> jar to just >>>>>>>>> >>>> the >>>>>>>>> >>>> executor task runner threads. Also, I don’t know if there >>>>>>>>> are any other >>>>>>>>> >>>> threads that might be interacting with kryo serialisation / >>>>>>>>> >>>> deserialisation. >>>>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the >>>>>>>>> thread’s >>>>>>>>> >>>> class >>>>>>>>> >>>> loader to include the application jar, then restore the class >>>>>>>>> loader >>>>>>>>> >>>> after >>>>>>>>> >>>> the kryo registrator has been run. I don’t know if this >>>>>>>>> would have any >>>>>>>>> >>>> other side-effects. >>>>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner >>>>>>>>> threads, >>>>>>>>> >>>> rather than delaying serialisation until later, when it can >>>>>>>>> be done >>>>>>>>> >>>> only if >>>>>>>>> >>>> needed. This approach would probably have negative >>>>>>>>> performance >>>>>>>>> >>>> consequences. >>>>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation / >>>>>>>>> >>>> deserialisation that has the application jar on the class >>>>>>>>> path. >>>>>>>>> >>>> Serialisation / deserialisation would be the only thing >>>>>>>>> these threads >>>>>>>>> >>>> do, >>>>>>>>> >>>> and this would minimise conflicts / interactions between the >>>>>>>>> application >>>>>>>>> >>>> jar and other jars. >>>>>>>>> >>>> >>>>>>>>> >>>> #4 sounds like the best approach to me, but I think would >>>>>>>>> require >>>>>>>>> >>>> considerable knowledge of Spark internals, which is beyond me >>>>>>>>> at >>>>>>>>> >>>> present. >>>>>>>>> >>>> Does anyone have any better (and ideally simpler) ideas? >>>>>>>>> >>>> >>>>>>>>> >>>> Cheers, >>>>>>>>> >>>> >>>>>>>>> >>>> Graham >>>>>>>>> >>>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >> >>>>>>>>> > >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >