Hi,

I have created a streaming object from checkpoint but it always through up
error as stream corrupted when I restart spark streaming job. any solution
for this?

private def createStreamingContext(
    sparkCheckpointDir: String, sparkSession: SparkSession,
    batchDuration: Int, config: com.typesafe.config.Config) = {
    val topics = config.getString(Constants.Properties.KafkaTopics)
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" ->
config.getString(Constants.Properties.KafkaBrokerList))
    val ssc = new StreamingContext(sparkSession.sparkContext,
Seconds(batchDuration))
    val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val datapointDStream =
messages.map(_._2).map(TransformDatapoint.parseDataPointText)
    lazy val sqlCont = sparkSession.sqlContext

    hiveDBInstance = config.getString("hiveDBInstance")

    TransformDatapoint.readDstreamData(sparkSession, sqlCont,
datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance)

    ssc.checkpoint(sparkCheckpointDir)
    ssc
  }



// calling streming context method

 val streamingContext =
StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir),
() =>
createStreamingContext(config.getString(Constants.Properties.CheckPointDir),
sparkSession, config.getInt(Constants.Properties.BatchInterval), config))

*ERROR:*
org.apache.spark.SparkException: Failed to read checkpoint from directory
hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC

java.io.IOException: Stream is corrupted


Thanks,
Asmath

Reply via email to