What do you mean by "new file", do you upload an already existing file onto 
HDFS or create a new one locally and then upload it to HDFS?


bit1...@163.com
 
From: ravi tella
Date: 2015-06-30 09:59
To: user
Subject: spark streaming HDFS file issue
I am running a spark streaming example from learning spark book with one 
change. The change I made was for streaming a file from HDFS.

val lines = ssc.textFileStream("hdfs:/user/hadoop/spark/streaming/input")

I ran the application number of times and every time dropped a new file in the 
input directory after the program started running but never got any output.

I even passed a non existent directory as the input to the textFileStream but 
the application did not throw any error and ran just like it did when i had the 
right directory.

I am able to access the same HDFS file system from non smearing application 
fine. 

I wasted a day trying to figure this out:). Any help with be greatly 
appreciated.

package com.oreilly.learningsparkexamples.scala

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._

object StreamingLogInput {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("StreamingLogInput")
    val ssc = new StreamingContext(conf, Seconds(1))

    val lines = ssc.textFileStream("hdfs:/user/hadoop/spark/streaming/input")
    println("testtesttestteste")
     lines.print()
    ssc.start()
    ssc.awaitTermination(60000)
    ssc.stop()
  }
  def processLines(lines: DStream[String]) = {
    lines.filter(_.contains("error"))

  }

Reply via email to