Hi, I have created a streaming object from checkpoint but it always through up error as stream corrupted when I restart spark streaming job. any solution for this?
private def createStreamingContext( sparkCheckpointDir: String, sparkSession: SparkSession, batchDuration: Int, config: com.typesafe.config.Config) = { val topics = config.getString(Constants.Properties.KafkaTopics) val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> config.getString(Constants.Properties.KafkaBrokerList)) val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(batchDuration)) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) val datapointDStream = messages.map(_._2).map(TransformDatapoint.parseDataPointText) lazy val sqlCont = sparkSession.sqlContext hiveDBInstance = config.getString("hiveDBInstance") TransformDatapoint.readDstreamData(sparkSession, sqlCont, datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance) ssc.checkpoint(sparkCheckpointDir) ssc } // calling streming context method val streamingContext = StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir), () => createStreamingContext(config.getString(Constants.Properties.CheckPointDir), sparkSession, config.getInt(Constants.Properties.BatchInterval), config)) *ERROR:* org.apache.spark.SparkException: Failed to read checkpoint from directory hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC java.io.IOException: Stream is corrupted Thanks, Asmath