Hi all, it throws FileBasedWriteAheadLogReader: Error reading next item, EOF reached java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
WAL is not enabled in config, it is default, hence false. The code is by example and quite simple for testing (I'm aware that file save isn't idempotent). Or do I have something wrong there? It was tried on Spark 1.5.0. object Loader { def main(args: Array[String]): Unit = { val checkpointDir = "/dfs/spark/checkpoints" val sparkConf = new SparkConf() .setAppName("Spark Loader") .setIfMissing("spark.master", "local[2]") .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000") val ssc = StreamingContext.getOrCreate( checkpointDir, createStreamingContext(sparkConf, checkpointDir)) ssc.start() ssc.awaitTermination() } def createStreamingContext(conf: SparkConf, checkpointDir: String)(): StreamingContext = { val ssc = new StreamingContext(conf, Seconds(60)) val sc = ssc.sparkContext val sqlc = new SQLContext(sc) ssc.checkpoint(checkpointDir) import sqlc.implicits._ val kafkaParams = Map[String, String]( "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092", "auto.offset.reset" -> "smallest") val topics = Set("topic-p03-r01") val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics) stream .checkpoint(Seconds(60)) .foreachRDD { (rdd, time) => rdd.toDF() .write .json(s"/dfs/spark/agg/${time.milliseconds / 1000}") } ssc } } Many thanks for any idea, Petr