Hi,
I am studying the RDD Caching function and write a small program to verify it.
I run the program in a Spark1.3.0 environment and on Yarn cluster. But I meet a
weird exception. It isn't always generated in the log. Only sometimes I can see
this exception. And it does not affect the output of my program. Could anyone
explain why this happens and how to eliminate it?
My program and the exception is listed in the following. Thanks very much for
the help!
*****Program*****
object TestSparkCaching01 {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired","true")
conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]]))
val inFile = "hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt"
val sc = new SparkContext(conf)
val rdd = sc.textFile(inFile)
rdd.cache()
rdd.map("Cache String: "+_).foreach(println )
sc.stop()
}
}
*****Exception*****
15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was thrown
by an exception handler.
java.util.concurrent.RejectedExecutionException: Worker has already been
shutdown
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
at
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
at
org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
at
org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
at
org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
at
org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at
org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
at
org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211)
at
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223)
at
akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222)
at scala.util.Success.foreach(Try.scala:205)
at
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
at
scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Remoting shut down.