import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import scala.collection.mutable.Queue

object CheckPointTest extends App {
  
    def createContext3() = {
	  val conf = new SparkConf().setAppName("CheckPointTest").setMaster("local[*]")
	  val sc = new SparkContext(conf)
	  val ssc = new StreamingContext(sc, Seconds(10))
	  ssc.checkpoint("checkpointfolder")
	  
	  val reference =
	  ssc
	  .textFileStream("referencedata")
	  .map(line => (line.substring(0, 1),line))
	  .updateStateByKey((x: Seq[String], y: Option[String]) => {
		  if(x.size > 0) Option(x.head)
		  else y
	    }
	  )

	  val stream =
	  ssc
	  .textFileStream("streamdata")
	  .map(line => (line, 1))
	  
	  val output =
	  stream
	  .leftOuterJoin(reference)
	  
	  stream.print
//	  output.print
	  ssc
  }
  
  // if use "createContext", the application hangs when stop it
  val ssc = StreamingContext.getOrCreate("checkpointfolder", createContext3)
  
  import scala.concurrent._
  import scala.concurrent.ExecutionContext.Implicits.global
  val stopFunc = future {
    // Run this application for 2 minutes
        Thread.sleep(120000);
        ssc.stop(true);
      }
  
  ssc.start
  val stopped = ssc.awaitTermination()
}