Also, giving more log4j messages around the error area would be useful.

On Thu, Sep 17, 2015 at 1:02 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Is there a particular reason you're calling checkpoint on the stream in
> addition to the streaming context?
>
> On Thu, Sep 17, 2015 at 2:36 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>
>> Hi all,
>> it throws FileBasedWriteAheadLogReader: Error reading next item, EOF
>> reached
>> java.io.EOFException
>>   at java.io.DataInputStream.readInt(DataInputStream.java:392)
>>   at
>> org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
>>
>> WAL is not enabled in config, it is default, hence false.
>>
>> The code is by example and quite simple for testing (I'm aware that file
>> save isn't idempotent). Or do I have something wrong there? It was tried on
>> Spark 1.5.0.
>>
>> object Loader {
>>   def main(args: Array[String]): Unit = {
>>
>>     val checkpointDir = "/dfs/spark/checkpoints"
>>
>>     val sparkConf = new SparkConf()
>>       .setAppName("Spark Loader")
>>       .setIfMissing("spark.master", "local[2]")
>>       .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000")
>>
>>     val ssc = StreamingContext.getOrCreate(
>>       checkpointDir,
>>       createStreamingContext(sparkConf, checkpointDir))
>>
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>>
>>   def createStreamingContext(conf: SparkConf, checkpointDir: String)(): 
>> StreamingContext = {
>>     val ssc = new StreamingContext(conf, Seconds(60))
>>
>>     val sc = ssc.sparkContext
>>     val sqlc = new SQLContext(sc)
>>
>>     ssc.checkpoint(checkpointDir)
>>
>>     import sqlc.implicits._
>>
>>     val kafkaParams = Map[String, String](
>>       "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092",
>>       "auto.offset.reset" -> "smallest")
>>
>>     val topics = Set("topic-p03-r01")
>>
>>     val stream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>         ssc, kafkaParams, topics)
>>
>>     stream
>>       .checkpoint(Seconds(60))
>>       .foreachRDD { (rdd, time) =>
>>       rdd.toDF()
>>         .write
>>         .json(s"/dfs/spark/agg/${time.milliseconds / 1000}")
>>     }
>>
>>     ssc
>>   }
>> }
>>
>>
>> Many thanks for any idea,
>> Petr
>>
>
>

Reply via email to