-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Thanks everyone, that was the problem. the "create new streaming
context" function was supposed to setup the stream processing as well
as the checkpoint directory. I had missed the whole process of
checkpoint setup. With that done, everything works as expected.

For the benefit of others, my final version of the code that works
looks like this and it works correctly:


object RawLogProcessor extends Logging {

  import TacomaHelper._

  val checkpointDir = "/tmp/checkpointDir_tacoma"
  var ssc: Option[StreamingContext] = None

  def createSparkConf(config: Config): SparkConf = {
    val sparkConf = new SparkConf()
    config.entrySet.asScala
      .map(kv => kv.getKey -> kv.getValue)
      .foreach { case (k, v) => sparkConf.set(s"spark.$k",
unquote(v.render())) }

    sparkConf.registerKryoClasses(Array(classOf[VideoView],
classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression]))
    sparkConf
  }

  // a function that returns a function of type: `() => StreamingContext
`
  def createContext(sparkConfig: Config, kafkaConf: Config)(f:
StreamingContext => StreamingContext) = () => {
    val batchDurationSecs =
sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS)
    val sparkConf = createSparkConf(sparkConfig)

    // calculate sparkContext and streamingContext
    val streamingContext = new StreamingContext(sparkConf,
Durations.seconds(batchDurationSecs))
    streamingContext.checkpoint(checkpointDir)

    // apply the streaming context function to the function
    f(streamingContext)
  }

  def createNewContext(sparkConf: Config, kafkaConf: Config, f:
StreamingContext => StreamingContext) = {
    logInfo("Create new Spark streamingContext with provided pipeline
function")
    StreamingContext.getOrCreate(
      checkpointPath = checkpointDir,
      creatingFunc = createContext(sparkConf, kafkaConf)(f),
      createOnError = true)
  }

  def apply(sparkConfig: Config, kafkaConf: Config): StreamingContext =
{
    rawlogTopic = kafkaConf.getString("rawlog.topic")
    kafkaParams = kafkaConf.entrySet.asScala
      .map(kv => kv.getKey -> unquote(kv.getValue.render()))
      .toMap

    if (ssc.isEmpty) {
      ssc = Some(createNewContext(sparkConfig, kafkaConf, setupPipeline)
)
    }
    ssc.get
  }

  var rawlogTopic: String = "qa-rawlog"
  var kafkaParams: Map[String, String] = Map()

  def setupPipeline(streamingContext: StreamingContext):
StreamingContext = {

    logInfo("Creating new kafka rawlog stream")
    // TODO: extract this and pass it around somehow
    val rawlogDStream = KafkaUtils.createDirectStream[String, Object,
StringDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
Set(rawlogTopic))

    logInfo("adding step to parse kafka stream into RawLog types
(Normalizer)")
    val eventStream = rawlogDStream
      .map({
      case (key, rawlogVal) =>
        val record = rawlogVal.asInstanceOf[GenericData.Record]
        val rlog = RawLog.newBuilder()
          .setId(record.get("id").asInstanceOf[String])
          .setAccount(record.get("account").asInstanceOf[String])
          .setEvent(record.get("event").asInstanceOf[String])
          .setTimestamp(record.get("timestamp").asInstanceOf[Long])
          .setUserAgent(record.get("user_agent").asInstanceOf[String])

.setParams(record.get("params").asInstanceOf[java.util.Map[String,
String]])
          .build()
        val norm = Normalizer(rlog)
        (key, rlog.getEvent, norm)
    })

    logInfo("Adding step to filter out VideoView only events and cache
them")
    val videoViewStream = eventStream
      .filter(_._2 == "video_view")
      .filter(_._3.isDefined)
      .map((z) => (z._1, z._3.get))
      .map((z) => (z._1, z._2.asInstanceOf[VideoView]))
      .cache()

    // repartition by account
    logInfo("repartition videoView by account and calculate stats")
    videoViewStream.map((v) => (v._2.getAccount, 1))
      .filter(_._1 != null)
      .window(Durations.seconds(20))
      .reduceByKey(_ + _)
      .print()

    // repartition by (deviceType, DeviceOS)
    logInfo("repartition videoView by (DeviceType, DeviceOS) and
calculate stats")
    videoViewStream.map((v) => ((v._2.getDeviceType,
v._2.getDeviceOs), 1))
      .reduceByKeyAndWindow(_ + _, Durations.seconds(10))
      .print()

    streamingContext
  }

}

- - Ankur

On 13/05/2015 23:52, NB wrote:
> The data pipeline (DAG) should not be added to the StreamingContext
> in the case of a recovery scenario. The pipeline metadata is
> recovered from the checkpoint folder. That is one thing you will
> need to fix in your code. Also, I don't think the
> ssc.checkpoint(folder) call should be made in case of the
> recovery.
> 
> The idiom to follow is to set up the DAG in the creatingFunc and
> not outside of it. This will ensure that if a new context is being
> created i.e. checkpoint folder does not exist, the DAG will get
> added to it and then checkpointed. Once a recovery happens, this
> function is not invoked but everything is recreated from the
> checkpointed data.
> 
> Hope this helps, NB
> 
> 
> 
> -- View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Stream
ing-with-checkPointing-fails-to-restart-tp22864p22878.html
>
> 
Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
>
> 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
-----BEGIN PGP SIGNATURE-----

iQEcBAEBAgAGBQJVVGKmAAoJEOSJAMhvLp3LffcIAMuT9akiKqDQyUdg9leRGiWR
nCqc0+zv2EalEReevf8BC826uiXPi6Dcw0i6mvjy8m+je0FNwhkE0btyq/xyubHw
gGY3VG/zAjQwmKCeGxmrYscvtESh+kB7nEr2ajB5a+bM6FBJpnlCY/NlZp9NTfcQ
t1o//R0B4QjuEXHXRRvjptauyxqIhqU6s6JrU1ESxhF3Tcp6E7Q0upwztdN4Y1S9
jdpJbqgipZxMVLU2D2UcgdnIQwjgFMQSxRgHStVYI/+6eNUM7EcpAzBjd2HuFj0u
H2SrWVW/1uraMVrci5MRD+3BK3Ld3L9/sXu3LGi8xkDy/62zajLd83JPR4JZi2I=
=2fcq
-----END PGP SIGNATURE-----

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to