Also, giving more log4j messages around the error area would be useful. On Thu, Sep 17, 2015 at 1:02 PM, Cody Koeninger <c...@koeninger.org> wrote:
> Is there a particular reason you're calling checkpoint on the stream in > addition to the streaming context? > > On Thu, Sep 17, 2015 at 2:36 PM, Petr Novak <oss.mli...@gmail.com> wrote: > >> Hi all, >> it throws FileBasedWriteAheadLogReader: Error reading next item, EOF >> reached >> java.io.EOFException >> at java.io.DataInputStream.readInt(DataInputStream.java:392) >> at >> org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47) >> >> WAL is not enabled in config, it is default, hence false. >> >> The code is by example and quite simple for testing (I'm aware that file >> save isn't idempotent). Or do I have something wrong there? It was tried on >> Spark 1.5.0. >> >> object Loader { >> def main(args: Array[String]): Unit = { >> >> val checkpointDir = "/dfs/spark/checkpoints" >> >> val sparkConf = new SparkConf() >> .setAppName("Spark Loader") >> .setIfMissing("spark.master", "local[2]") >> .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000") >> >> val ssc = StreamingContext.getOrCreate( >> checkpointDir, >> createStreamingContext(sparkConf, checkpointDir)) >> >> ssc.start() >> ssc.awaitTermination() >> } >> >> def createStreamingContext(conf: SparkConf, checkpointDir: String)(): >> StreamingContext = { >> val ssc = new StreamingContext(conf, Seconds(60)) >> >> val sc = ssc.sparkContext >> val sqlc = new SQLContext(sc) >> >> ssc.checkpoint(checkpointDir) >> >> import sqlc.implicits._ >> >> val kafkaParams = Map[String, String]( >> "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092", >> "auto.offset.reset" -> "smallest") >> >> val topics = Set("topic-p03-r01") >> >> val stream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder]( >> ssc, kafkaParams, topics) >> >> stream >> .checkpoint(Seconds(60)) >> .foreachRDD { (rdd, time) => >> rdd.toDF() >> .write >> .json(s"/dfs/spark/agg/${time.milliseconds / 1000}") >> } >> >> ssc >> } >> } >> >> >> Many thanks for any idea, >> Petr >> > >