Hi Hari, Iam trying to read data from a file which is stored in HDFS. Using Flume the data is tailed and stored in HDFS. Now I want to read this data using TextFileStream. Using the below mentioned code Iam not able to fetch the Data from a file which is stored in HDFS. Can anyone help me with this issue.
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.collect.Lists; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class Test1 { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaStreamingContext ssc = new JavaStreamingContext("local[4]","JavaWordCount", new Duration(20000)); JavaDStream<String> textStream = ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in HDFS JavaDStream<String> suspectedStream = textStream.flatMap(new FlatMapFunction<String,String>() { public Iterable<String> call(String line) throws Exception { //return Arrays.asList(line.toString().toString()); return Lists.newArrayList(line.toString().toString()); } }); suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){ public Void call(JavaRDD<String> rdd) throws Exception { List<String> output = rdd.collect(); System.out.println("Sentences Collected from Flume " + output); return null; } }); suspectedStream.print(); System.out.println("Welcome TO Flume Streaming"); ssc.start(); ssc.awaitTermination(); } } The command I use is: ./bin/spark-submit --verbose --jars lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*] --deploy-mode client --class xyz.Test1 bin/filestream3.jar Regards, Jeniba Johnson ________________________________ The contents of this e-mail and any attachment(s) may contain confidential or privileged information for the intended recipient(s). Unintended recipients are prohibited from taking action on the basis of information in this e-mail and using or disseminating the information, and must notify the sender and delete it from their system. L&T Infotech will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in this e-mail"