It could be a similar issue as
https://issues.apache.org/jira/browse/SPARK-4300

Thanks
Best Regards

On Tue, Apr 21, 2015 at 8:09 AM, donhoff_h <[email protected]> wrote:

> Hi,
>
> I am studying the RDD Caching function and write a small program to verify
> it. I run the program in a Spark1.3.0 environment and on Yarn cluster. But
> I meet a weird exception. It isn't always generated in the log. Only
> sometimes I can see this exception. And it does not affect the output of my
> program.  Could anyone explain why this happens and how to eliminate it?
>
> My program and the exception is listed in the following. Thanks very much
> for the help!
>
> *****Program*****
> object TestSparkCaching01 {
>  def main(args: Array[String]) {
>    val conf = new SparkConf()
>    conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>    conf.set("spark.kryo.registrationRequired","true")
>
> conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]]))
>    val inFile = "hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt"
>    val sc = new SparkContext(conf)
>    val rdd = sc.textFile(inFile)
>    rdd.cache()
>    rdd.map("Cache String: "+_).foreach(println )
>    sc.stop()
>  }
> }
>
> *****Exception*****
> 15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was
> thrown by an exception handler.
> java.util.concurrent.RejectedExecutionException: Worker has already been
> shutdown
>                 at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
>                 at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
>                 at
> org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
>                 at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
>                 at
> org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
>                 at
> org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
>                 at
> org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
>                 at
> org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
>                 at
> org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
>                 at
> org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
>                 at
> org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211)
>                 at
> akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223)
>                 at
> akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222)
>                 at scala.util.Success.foreach(Try.scala:205)
>                 at
> scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
>                 at
> scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
>                 at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>                 at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>                 at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>                 at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>                 at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>                 at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>                 at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>                 at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>                 at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>                 at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>                 at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>                 at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>                 at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
>

Reply via email to