Hi, I am new to spark (and scala) and hope someone can help me with the issue I got stuck on in my experiments/learning.
mapWithState from spark 1.6 seems to be a great way for the task I want to implement with spark but unfortunately I am getting error "RDD transformations and actions can only be invoked by the driver, not inside of other transformations" on job restart when checkpoint already exists. Job starts and works ok if checkpoint is empty (this kind of defeats the point of having the checkpoint). I can reproduce it with ~65 lines of test code, see below. Is there something that I am doing wrong there? code: ---- import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ConstantInputDStream import org.apache.spark.streaming.{Durations, StreamingContext, _} object TestJob { def stateFunc(id: String, txt: Option[Iterable[String]], state: State[String]) : (String, Long) = { if (txt.nonEmpty) { val aggregatedString = state.getOption().getOrElse("") + txt state.update(aggregatedString) (id, aggregatedString.length) } else { // happens when state is timing out? any other cases? (id, 0) } } def createContext(checkpointDirectory: String): StreamingContext = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("test") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) ssc.checkpoint(checkpointDirectory) val input = Seq("1", "21", "321", "41", "42", "543", "67") val inputRdd = ssc.sparkContext.parallelize(input) val testStream = new ConstantInputDStream(ssc, inputRdd) val streamWithIds = testStream.map(x => (x.substring(0,1), x)) val batched = streamWithIds.groupByKey() val stateSpec = StateSpec.function(stateFunc _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days val result = batched.mapWithState(stateSpec) result.print ssc } def main(args: Array[String]): Unit = { val checkpointDirectory = com.google.common.io.Files.createTempDir() checkpointDirectory.deleteOnExit() val checkpointDirectoryName = checkpointDirectory.getAbsolutePath val ssc = StreamingContext.getOrCreate(checkpointDirectoryName, () => { createContext(checkpointDirectoryName) }) ssc.start() ssc.awaitTerminationOrTimeout(7000) ssc.stop() Thread.sleep(5000) val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName, () => { createContext(checkpointDirectoryName) }) // terminates here with // Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. ssc2.start() ssc2.awaitTerminationOrTimeout(7000) ssc2.stop() } } ---------- Andrey Yegorov