Hi,

I am new to spark (and scala) and hope someone can help me with the issue I
got stuck on in my experiments/learning.

mapWithState from spark 1.6 seems to be a great way for the task I want to
implement with spark but unfortunately I am getting error "RDD
transformations and actions can only be invoked by the driver, not inside
of other transformations" on job restart when checkpoint already exists.
Job starts and works ok if checkpoint is empty (this kind of defeats the
point of having the checkpoint).

I can reproduce it with ~65 lines of test code, see below.
Is there something that I am doing wrong there?

code:
----

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext, _}

object TestJob {
  def stateFunc(id: String,
                txt: Option[Iterable[String]],
                state: State[String]) : (String, Long) = {
    if (txt.nonEmpty) {
      val aggregatedString = state.getOption().getOrElse("") + txt
      state.update(aggregatedString)
      (id, aggregatedString.length)
    } else { // happens when state is timing out? any other cases?
      (id, 0)
    }
  }

  def createContext(checkpointDirectory: String): StreamingContext = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("test")

    val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
    ssc.checkpoint(checkpointDirectory)

    val input = Seq("1", "21", "321", "41", "42", "543", "67")
    val inputRdd = ssc.sparkContext.parallelize(input)
    val testStream = new ConstantInputDStream(ssc, inputRdd)

    val streamWithIds = testStream.map(x => (x.substring(0,1), x))
    val batched = streamWithIds.groupByKey()

    val stateSpec = StateSpec.function(stateFunc
_).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days

    val result = batched.mapWithState(stateSpec)
    result.print
    ssc
  }

  def main(args: Array[String]): Unit = {
    val checkpointDirectory = com.google.common.io.Files.createTempDir()
    checkpointDirectory.deleteOnExit()
    val checkpointDirectoryName = checkpointDirectory.getAbsolutePath

    val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
      () => {
        createContext(checkpointDirectoryName)
      })

    ssc.start()
    ssc.awaitTerminationOrTimeout(7000)
    ssc.stop()
    Thread.sleep(5000)

    val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
      () => {
        createContext(checkpointDirectoryName)
      })

    // terminates here with
    // Exception in thread "main" org.apache.spark.SparkException: RDD
transformations and actions can only be invoked by the driver, not inside
of other transformations; for example, rdd1.map(x => rdd2.values.count() *
x) is invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information, see
SPARK-5063.
    ssc2.start()
    ssc2.awaitTerminationOrTimeout(7000)
    ssc2.stop()
  }
}

----------
Andrey Yegorov

Reply via email to