Hi Shengzhe,

Even if we did make Configuration threadsafe, it'd take quite some time for
that to trickle down to a Hadoop release that we could actually rely on
Spark users having installed.  I agree we should consider whether making
Configuration threadsafe is something that Hadoop should do, but for the
short term I think Spark needs to be able to handle the common scenario of
Configuration being single-threaded.

Thanks!
Andrew


On Tue, Jul 15, 2014 at 2:43 PM, yao <yaosheng...@gmail.com> wrote:

> Good catch Andrew. In addition to your proposed solution, is that possible
> to fix Configuration class and make it thread-safe ? I think the fix should
> be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
> this change upstream (will hadoop guys accept this change ? for them, it
> seems they never expect Configuration object being accessed by multiple
> threads).
>
> -Shengzhe
>
>
> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <and...@andrewash.com> wrote:
>
> > Hi Spark devs,
> >
> > We discovered a very interesting bug in Spark at work last week in Spark
> > 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone
> to
> > thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
> >  Let me explain:
> >
> >
> > *Observations*
> >
> >    - Was running a relatively simple job (read from Avro files, do a map,
> >    do another map, write back to Avro files)
> >    - 412 of 413 tasks completed, but the last task was hung in RUNNING
> >    state
> >    - The 412 successful tasks completed in median time 3.4s
> >    - The last hung task didn't finish even in 20 hours
> >    - The executor with the hung task was responsible for 100% of one core
> >    of CPU usage
> >    - Jstack of the executor attached (relevant thread pasted below)
> >
> >
> > *Diagnosis*
> >
> > After doing some code spelunking, we determined the issue was concurrent
> > use of a Configuration object for each task on an executor.  In Hadoop
> each
> > task runs in its own JVM, but in Spark multiple tasks can run in the same
> > JVM, so the single-threaded access assumptions of the Configuration
> object
> > no longer hold in Spark.
> >
> > The specific issue is that the AvroRecordReader actually _modifies_ the
> > JobConf it's given when it's instantiated!  It adds a key for the RPC
> > protocol engine in the process of connecting to the Hadoop FileSystem.
> >  When many tasks start at the same time (like at the start of a job),
> many
> > tasks are adding this configuration item to the one Configuration object
> at
> > once.  Internally Configuration uses a java.lang.HashMap, which isn't
> > threadsafe… The below post is an excellent explanation of what happens in
> > the situation where multiple threads insert into a HashMap at the same
> time.
> >
> > http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
> >
> > The gist is that you have a thread following a cycle of linked list nodes
> > indefinitely.  This exactly matches our observations of the 100% CPU core
> > and also the final location in the stack trace.
> >
> > So it seems the way Spark shares a Configuration object between task
> > threads in an executor is incorrect.  We need some way to prevent
> > concurrent access to a single Configuration object.
> >
> >
> > *Proposed fix*
> >
> > We can clone the JobConf object in HadoopRDD.getJobConf() so each task
> > gets its own JobConf object (and thus Configuration object).  The
> > optimization of broadcasting the Configuration object across the cluster
> > can remain, but on the other side I think it needs to be cloned for each
> > task to allow for concurrent access.  I'm not sure the performance
> > implications, but the comments suggest that the Configuration object is
> > ~10KB so I would expect a clone on the object to be relatively speedy.
> >
> > Has this been observed before?  Does my suggested fix make sense?  I'd be
> > happy to file a Jira ticket and continue discussion there for the right
> way
> > to fix.
> >
> >
> > Thanks!
> > Andrew
> >
> >
> > P.S.  For others seeing this issue, our temporary workaround is to enable
> > spark.speculation, which retries failed (or hung) tasks on other
> machines.
> >
> >
> >
> > "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
> > nid=0x54b1 runnable [0x00007f92d74f1000]
> >    java.lang.Thread.State: RUNNABLE
> >     at java.util.HashMap.transfer(HashMap.java:601)
> >     at java.util.HashMap.resize(HashMap.java:581)
> >     at java.util.HashMap.addEntry(HashMap.java:879)
> >     at java.util.HashMap.put(HashMap.java:505)
> >     at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
> >     at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
> >     at
> > org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
> >     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
> >     at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
> >     at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
> >     at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
> >     at
> >
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
> >     at
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
> >     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
> >     at
> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
> >     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
> >     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
> >     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
> >     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
> >     at
> > org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
> >     at
> >
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
> >     at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> >     at org.apache.spark.scheduler.Task.run(Task.scala:53)
> >     at
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
> >     at
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> >     at
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> >     at java.security.AccessController.doPrivileged(Native Method)
> >     at javax.security.auth.Subject.doAs(Subject.java:415)
> >     at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> >     at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> >     at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> >
>

Reply via email to