It could be a similar issue as https://issues.apache.org/jira/browse/SPARK-4300
Thanks Best Regards On Tue, Apr 21, 2015 at 8:09 AM, donhoff_h <[email protected]> wrote: > Hi, > > I am studying the RDD Caching function and write a small program to verify > it. I run the program in a Spark1.3.0 environment and on Yarn cluster. But > I meet a weird exception. It isn't always generated in the log. Only > sometimes I can see this exception. And it does not affect the output of my > program. Could anyone explain why this happens and how to eliminate it? > > My program and the exception is listed in the following. Thanks very much > for the help! > > *****Program***** > object TestSparkCaching01 { > def main(args: Array[String]) { > val conf = new SparkConf() > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > conf.set("spark.kryo.registrationRequired","true") > > conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]])) > val inFile = "hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt" > val sc = new SparkContext(conf) > val rdd = sc.textFile(inFile) > rdd.cache() > rdd.map("Cache String: "+_).foreach(println ) > sc.stop() > } > } > > *****Exception***** > 15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was > thrown by an exception handler. > java.util.concurrent.RejectedExecutionException: Worker has already been > shutdown > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72) > at > org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) > at > org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56) > at > org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36) > at > org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34) > at > org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496) > at > org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46) > at > org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54) > at > org.jboss.netty.channel.Channels.disconnect(Channels.java:781) > at > org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211) > at > akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223) > at > akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222) > at scala.util.Success.foreach(Try.scala:205) > at > scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) > at > scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204) > at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: > Remoting shut down. >
