Here: https://github.com/apache/spark/pull/1948



On Thu, Aug 14, 2014 at 5:45 PM, Debasish Das <debasish.da...@gmail.com>
wrote:

> Is there a fix that I can test ? I have the flows setup for both
> standalone and YARN runs...
>
> Thanks.
> Deb
>
>
>
> On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <r...@databricks.com> wrote:
>
>> Yes, I understand it might not work for custom serializer, but that is a
>> much less common path.
>>
>> Basically I want a quick fix for 1.1 release (which is coming up soon). I
>> would not be comfortable making big changes to class path late into the
>> release cycle. We can do that for 1.2.
>>
>>
>>
>>
>>
>> On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.den...@gmail.com>
>> wrote:
>>
>>> That should work, but would you also make these changes to the
>>> JavaSerializer?  The API of these is the same so that you can select one or
>>> the other (or in theory a custom serializer)?  This also wouldn't address
>>> the problem of shipping custom *serializers* (not kryo registrators) in
>>> user jars.
>>>
>>> On 14 August 2014 19:23, Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> Graham,
>>>>
>>>> SparkEnv only creates a KryoSerializer, but as I understand that
>>>> serializer doesn't actually initializes the registrator since that is only
>>>> called when newKryo() is called when KryoSerializerInstance is initialized.
>>>>
>>>> Basically I'm thinking a quick fix for 1.2:
>>>>
>>>> 1. Add a classLoader field to KryoSerializer; initialize new
>>>> KryoSerializerInstance with that class loader
>>>>
>>>>  2. Set that classLoader to the executor's class loader when Executor
>>>> is initialized.
>>>>
>>>> Then all deser calls should be using the executor's class loader.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <
>>>> graham.den...@gmail.com> wrote:
>>>>
>>>>> Hi Reynold,
>>>>>
>>>>> That would solve this specific issue, but you'd need to be careful
>>>>> that you never created a serialiser instance before the first task is
>>>>> received.  Currently in Executor.TaskRunner.run a closure serialiser
>>>>> instance is created before any application jars are downloaded, but that
>>>>> could be moved.  To me, this seems a little fragile.
>>>>>
>>>>> However there is a related issue where you can't ship a custom
>>>>> serialiser in an application jar because the serialiser is instantiated
>>>>> when the SparkEnv object is created, which is before any tasks are 
>>>>> received
>>>>> by the executor.  The above approach wouldn't help with this problem.
>>>>>  Additionally, the YARN scheduler currently uses this approach of adding
>>>>> the application jar to the Executor classpath, so it would make things a
>>>>> bit more uniform.
>>>>>
>>>>> Cheers,
>>>>> Graham
>>>>>
>>>>>
>>>>> On 14 August 2014 17:37, Reynold Xin <r...@databricks.com> wrote:
>>>>>
>>>>>> Graham,
>>>>>>
>>>>>> Thanks for working on this. This is an important bug to fix.
>>>>>>
>>>>>>  I don't have the whole context and obviously I haven't spent nearly
>>>>>> as much time on this as you have, but I'm wondering what if we always 
>>>>>> pass
>>>>>> the executor's ClassLoader to the Kryo serializer? Will that solve this
>>>>>> problem?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <
>>>>>> graham.den...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Deb,
>>>>>>>
>>>>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>>>>> issue, custom serialisers currently can't live in application jars and 
>>>>>>> must
>>>>>>> be available to all executors at launch.  My PR fixes this issue as 
>>>>>>> well,
>>>>>>> allowing custom serialisers to be shipped in application jars.
>>>>>>>
>>>>>>> Graham
>>>>>>>
>>>>>>>
>>>>>>> On 14 August 2014 16:56, Debasish Das <debasish.da...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sorry I just saw Graham's email after sending my previous email
>>>>>>>> about this bug...
>>>>>>>>
>>>>>>>> I have been seeing this same issue on our ALS runs last week but I
>>>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 
>>>>>>>> 1.0...
>>>>>>>>
>>>>>>>> What's the status of this PR ? Will this fix be back-ported to
>>>>>>>> 1.0.1 as we are running 1.0.1 stable standalone cluster ?
>>>>>>>>
>>>>>>>> Till the PR merges does it make sense to not use Kryo ? What are
>>>>>>>> the other recommended efficient serializers ?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> Deb
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>>>>> graham.den...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I now have a complete pull request for this issue that I'd like to
>>>>>>>>> get
>>>>>>>>> reviewed and committed.  The PR is available here:
>>>>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase
>>>>>>>>> for the
>>>>>>>>> issue I described.  I've also submitted a related PR (
>>>>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>>>>> raised
>>>>>>>>> while attempting to run the custom kryo registrator not to be
>>>>>>>>> swallowed.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Graham
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> > I've submitted a work-in-progress pull request for this issue
>>>>>>>>> that I'd
>>>>>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890
>>>>>>>>> . I've
>>>>>>>>> > also submitted a pull request for the related issue that the
>>>>>>>>> exceptions hit
>>>>>>>>> > when trying to use a custom kryo registrator are being swallowed:
>>>>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>>>>> >
>>>>>>>>> > The approach in my pull request is to get the Worker processes
>>>>>>>>> to download
>>>>>>>>> > the application jars and add them to the Executor class path at
>>>>>>>>> launch
>>>>>>>>> > time. There are a couple of things that still need to be done
>>>>>>>>> before this
>>>>>>>>> > can be merged:
>>>>>>>>> > 1. At the moment, the first time a task runs in the executor, the
>>>>>>>>> > application jars are downloaded again.  My solution here would
>>>>>>>>> be to make
>>>>>>>>> > the executor not download any jars that already exist.
>>>>>>>>>  Previously, the
>>>>>>>>> > driver & executor kept track of the timestamp of jar files and
>>>>>>>>> would
>>>>>>>>> > redownload 'updated' jars, however this never made sense as the
>>>>>>>>> previous
>>>>>>>>> > version of the updated jar may have already been loaded into the
>>>>>>>>> executor,
>>>>>>>>> > so the updated jar may have no effect.  As my current pull
>>>>>>>>> request removes
>>>>>>>>> > the timestamp for jars, just checking whether the jar exists
>>>>>>>>> will allow us
>>>>>>>>> > to avoid downloading the jars again.
>>>>>>>>> > 2. Tests. :-)
>>>>>>>>> >
>>>>>>>>> > A side-benefit of my pull request is that you will be able to
>>>>>>>>> use custom
>>>>>>>>> > serialisers that are distributed in a user jar.  Currently, the
>>>>>>>>> serialiser
>>>>>>>>> > instance is created in the Executor process before the first
>>>>>>>>> task is
>>>>>>>>> > received and therefore before any user jars are downloaded.  As
>>>>>>>>> this PR
>>>>>>>>> > adds user jars to the Executor process at launch time, this
>>>>>>>>> won't be an
>>>>>>>>> > issue.
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> >> See my comment on
>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for
>>>>>>>>> >> the full stacktrace, but it's in the
>>>>>>>>> BlockManager/BlockManagerWorker where
>>>>>>>>> >> it's trying to fulfil a "getBlock" request for another node.
>>>>>>>>>  The objects
>>>>>>>>> >> that would be in the block haven't yet been serialised, and
>>>>>>>>> that then
>>>>>>>>> >> causes the deserialisation to happen on that thread.  See
>>>>>>>>> >> MemoryStore.scala:102.
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com>
>>>>>>>>> wrote:
>>>>>>>>> >>
>>>>>>>>> >>> I don't think it was a conscious design decision to not
>>>>>>>>> include the
>>>>>>>>> >>> application classes in the connection manager serializer. We
>>>>>>>>> should fix
>>>>>>>>> >>> that. Where is it deserializing data in that thread?
>>>>>>>>> >>>
>>>>>>>>> >>>  4 might make sense in the long run, but it adds a lot of
>>>>>>>>> complexity to
>>>>>>>>> >>> the code base (whole separate code base, task queue,
>>>>>>>>> blocking/non-blocking
>>>>>>>>> >>> logic within task threads) that can be error prone, so I think
>>>>>>>>> it is best
>>>>>>>>> >>> to stay away from that right now.
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <
>>>>>>>>> graham.den...@gmail.com>
>>>>>>>>> >>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>>> Hi Spark devs,
>>>>>>>>> >>>>
>>>>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878) which
>>>>>>>>> occurs when
>>>>>>>>> >>>> using
>>>>>>>>> >>>> Kryo serialisation with a custom Kryo registrator to register
>>>>>>>>> custom
>>>>>>>>> >>>> classes with Kryo.  This is an insidious issue that
>>>>>>>>> >>>> non-deterministically
>>>>>>>>> >>>> causes Kryo to have different ID number => class name maps on
>>>>>>>>> different
>>>>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException,
>>>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>>>>>>>> >>>> deserialisation
>>>>>>>>> >>>> time.  I’ve created a reliable reproduction for the issue
>>>>>>>>> here:
>>>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>>>>> >>>>
>>>>>>>>> >>>> I’m happy to try and put a pull request together to try and
>>>>>>>>> address
>>>>>>>>> >>>> this,
>>>>>>>>> >>>> but it’s not obvious to me the right way to solve this and
>>>>>>>>> I’d like to
>>>>>>>>> >>>> get
>>>>>>>>> >>>> feedback / ideas on how to address this.
>>>>>>>>> >>>>
>>>>>>>>> >>>> The root cause of the problem is a "Failed to run
>>>>>>>>> >>>> spark.kryo.registrator”
>>>>>>>>> >>>> error which non-deterministically occurs in some executor
>>>>>>>>> processes
>>>>>>>>> >>>> during
>>>>>>>>> >>>> operation.  My custom Kryo registrator is in the application
>>>>>>>>> jar, and
>>>>>>>>> >>>> it is
>>>>>>>>> >>>> accessible on the worker nodes.  This is demonstrated by the
>>>>>>>>> fact that
>>>>>>>>> >>>> most
>>>>>>>>> >>>> of the time the custom kryo registrator is successfully run.
>>>>>>>>> >>>>
>>>>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation
>>>>>>>>> is happening
>>>>>>>>> >>>> most of the time on an “Executor task launch worker” thread,
>>>>>>>>> which has
>>>>>>>>> >>>> the
>>>>>>>>> >>>> thread's class loader set to contain the application jar.
>>>>>>>>>  This happens
>>>>>>>>> >>>> in
>>>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from
>>>>>>>>> what I can
>>>>>>>>> >>>> tell, it is only these threads that have access to the
>>>>>>>>> application jar
>>>>>>>>> >>>> (that contains the custom Kryo registrator).  However, the
>>>>>>>>> >>>> ConnectionManager threads sometimes need to
>>>>>>>>> serialise/deserialise
>>>>>>>>> >>>> objects
>>>>>>>>> >>>> to satisfy “getBlock” requests when the objects haven’t
>>>>>>>>> previously been
>>>>>>>>> >>>> serialised.  As the ConnectionManager threads don’t have the
>>>>>>>>> application
>>>>>>>>> >>>> jar available from their class loader, when it tries to look
>>>>>>>>> up the
>>>>>>>>> >>>> custom
>>>>>>>>> >>>> Kryo registrator, this fails.  Spark then swallows this
>>>>>>>>> exception, which
>>>>>>>>> >>>> results in a different ID number —> class mapping for this
>>>>>>>>> kryo
>>>>>>>>> >>>> instance,
>>>>>>>>> >>>> and this then causes deserialisation errors later on a
>>>>>>>>> different node.
>>>>>>>>> >>>>
>>>>>>>>> >>>> A related issue to the issue reported in SPARK-2878 is that
>>>>>>>>> Spark
>>>>>>>>> >>>> probably
>>>>>>>>> >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>>>>>>>>> >>>> registrators.
>>>>>>>>> >>>>  The user has explicitly specified this class, and if it
>>>>>>>>> >>>> deterministically
>>>>>>>>> >>>> can’t be found, then it may cause problems at serialisation /
>>>>>>>>> >>>> deserialisation time.  If only sometimes it can’t be found
>>>>>>>>> (as in this
>>>>>>>>> >>>> case), then it leads to a data corruption issue later on.
>>>>>>>>>  Either way,
>>>>>>>>> >>>> we’re better off dying due to the ClassNotFound exception
>>>>>>>>> earlier, than
>>>>>>>>> >>>> the
>>>>>>>>> >>>> weirder errors later on.
>>>>>>>>> >>>>
>>>>>>>>> >>>> I have some ideas on potential solutions to this issue, but
>>>>>>>>> I’m keen for
>>>>>>>>> >>>> experienced eyes to critique these approaches:
>>>>>>>>> >>>>
>>>>>>>>> >>>> 1. The simplest approach to fixing this would be to just make
>>>>>>>>> the
>>>>>>>>> >>>> application jar available to the connection manager threads,
>>>>>>>>> but I’m
>>>>>>>>> >>>> guessing it’s a design decision to isolate the application
>>>>>>>>> jar to just
>>>>>>>>> >>>> the
>>>>>>>>> >>>> executor task runner threads.  Also, I don’t know if there
>>>>>>>>> are any other
>>>>>>>>> >>>> threads that might be interacting with kryo serialisation /
>>>>>>>>> >>>> deserialisation.
>>>>>>>>> >>>> 2. Before looking up the custom Kryo registrator, change the
>>>>>>>>> thread’s
>>>>>>>>> >>>> class
>>>>>>>>> >>>> loader to include the application jar, then restore the class
>>>>>>>>> loader
>>>>>>>>> >>>> after
>>>>>>>>> >>>> the kryo registrator has been run.  I don’t know if this
>>>>>>>>> would have any
>>>>>>>>> >>>> other side-effects.
>>>>>>>>> >>>> 3. Always serialise / deserialise on the existing TaskRunner
>>>>>>>>> threads,
>>>>>>>>> >>>> rather than delaying serialisation until later, when it can
>>>>>>>>> be done
>>>>>>>>> >>>> only if
>>>>>>>>> >>>> needed.  This approach would probably have negative
>>>>>>>>> performance
>>>>>>>>> >>>> consequences.
>>>>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation /
>>>>>>>>> >>>> deserialisation that has the application jar on the class
>>>>>>>>> path.
>>>>>>>>> >>>>  Serialisation / deserialisation would be the only thing
>>>>>>>>> these threads
>>>>>>>>> >>>> do,
>>>>>>>>> >>>> and this would minimise conflicts / interactions between the
>>>>>>>>> application
>>>>>>>>> >>>> jar and other jars.
>>>>>>>>> >>>>
>>>>>>>>> >>>> #4 sounds like the best approach to me, but I think would
>>>>>>>>> require
>>>>>>>>> >>>> considerable knowledge of Spark internals, which is beyond me
>>>>>>>>> at
>>>>>>>>> >>>> present.
>>>>>>>>> >>>>  Does anyone have any better (and ideally simpler) ideas?
>>>>>>>>> >>>>
>>>>>>>>> >>>> Cheers,
>>>>>>>>> >>>>
>>>>>>>>> >>>> Graham
>>>>>>>>> >>>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to