Hi all,
it throws FileBasedWriteAheadLogReader: Error reading next item, EOF reached
java.io.EOFException
  at java.io.DataInputStream.readInt(DataInputStream.java:392)
  at
org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)

WAL is not enabled in config, it is default, hence false.

The code is by example and quite simple for testing (I'm aware that file
save isn't idempotent). Or do I have something wrong there? It was tried on
Spark 1.5.0.

object Loader {
  def main(args: Array[String]): Unit = {

    val checkpointDir = "/dfs/spark/checkpoints"

    val sparkConf = new SparkConf()
      .setAppName("Spark Loader")
      .setIfMissing("spark.master", "local[2]")
      .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000")

    val ssc = StreamingContext.getOrCreate(
      checkpointDir,
      createStreamingContext(sparkConf, checkpointDir))

    ssc.start()
    ssc.awaitTermination()
  }

  def createStreamingContext(conf: SparkConf, checkpointDir:
String)(): StreamingContext = {
    val ssc = new StreamingContext(conf, Seconds(60))

    val sc = ssc.sparkContext
    val sqlc = new SQLContext(sc)

    ssc.checkpoint(checkpointDir)

    import sqlc.implicits._

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092",
      "auto.offset.reset" -> "smallest")

    val topics = Set("topic-p03-r01")

    val stream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
        ssc, kafkaParams, topics)

    stream
      .checkpoint(Seconds(60))
      .foreachRDD { (rdd, time) =>
      rdd.toDF()
        .write
        .json(s"/dfs/spark/agg/${time.milliseconds / 1000}")
    }

    ssc
  }
}


Many thanks for any idea,
Petr

Reply via email to