Hello, I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an issue with the SparkContext.
Basically, I have an object that needs to do several things: - call an external service One (web api) - call an external service Two (another api) - read and produce an RDD from HDFS (Spark) - parallelize the data obtained in the first two calls - join these different rdds, do stuff with them... Now, I am trying to do it in an asynchronous way. This doesn't seem to work, though. My guess is that Spark doesn't see the calls to .parallelize, as they are made in different tasks (or Future, therefore this code is called before/later or maybe with an unset Context (can it be?)). I have tried different ways, one of these being the call to SparkEnv.set in the calls to flatMap and map (in the Future). However, all I get is Cannot call methods on a stopped SparkContext. It just doesnt'work - maybe I just misunderstood what it does, therefore I removed it. This is the code I have written so far: object Fetcher { def fetch(name, master, ...) = { val externalCallOne: Future[WSResponse] = externalService1() val externalCallTwo: Future[String] = externalService2() // val sparkEnv = SparkEnv.get val config = new SparkConf() .setAppName(name) .set("spark.master", master) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkContext = new SparkContext(config) //val sparkEnv = SparkEnv.get val eventuallyJoinedData = externalCallOne flatMap { dataOne => // SparkEnv.set(sparkEnv) externalCallTwo map { dataTwo => println("in map") // prints, so it gets here ... val rddOne = sparkContext.parallelize(dataOne) val rddTwo = sparkContext.parallelize(dataTwo) // do stuff here ... foreach/println, and val joinedData = rddOne leftOuterJoin (rddTwo) } } eventuallyJoinedData onSuccess { case success => ... } eventuallyJoinedData onFailure { case error => println(error.getMessage) } // sparkContext.stop } } As you can see, I have also tried to comment the line to stop the context, but then I get another issue: 13:09:14.929 [ForkJoinPool-1-worker-5] INFO org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop. 13:09:14.936 [Spark Context Cleaner] ERROR org.apache.spark.ContextCleaner - Error in cleaning thread java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) ~[na:1.8.0_65] at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) ~[na:1.8.0_65] at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157) ~[spark-core_2.10-1.5.1.jar:1.5.1] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) [spark-core_2.10-1.5.1.jar:1.5.1] at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154) [spark-core_2.10-1.5.1.jar:1.5.1] at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67) [spark-core_2.10-1.5.1.jar:1.5.1] 13:09:14.940 [db-async-netty-thread-1] DEBUG io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop. 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils - uncaught error in thread SparkListenerBus, stopping SparkContext java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[na:1.8.0_65] at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[na:1.8.0_65] at java.util.concurrent.Semaphore.acquire(Semaphore.java:312) ~[na:1.8.0_65] at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:65) ~[spark-core_2.10-1.5.1.jar:1.5.1] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) ~[spark-core_2.10-1.5.1.jar:1.5.1] at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) [spark-core_2.10-1.5.1.jar:1.5.1] 13:09:14.949 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle - stopping org.spark-project.jetty.server.Server@787cbcef 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle - stopping SelectChannelConnector@0.0.0.0:4040 13:09:14.959 [SparkListenerBus] DEBUG o.s.j.u.component.AbstractLifeCycle - stopping org.spark-project.jetty.server.nio.SelectChannelConnector$ConnectorSelectorManager@797cc465 As you can see, it tries to call the count operation on the RDD, but then it fails (possibly, because the SparkContext is null(?)). How do I address this issue? What needs to be done? Do I need to switch to a synchronous architecture? Thanks in advance. Kind regards, Marco