Hi Hari,
Iam trying to read data from a file which is stored in HDFS. Using Flume the
data is tailed and stored in HDFS.
Now I want to read this data using TextFileStream. Using the below mentioned
code Iam not able to fetch the
Data from a file which is stored in HDFS. Can anyone help me with this issue.
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class Test1 {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaStreamingContext ssc = new
JavaStreamingContext("local[4]","JavaWordCount", new Duration(20000));
JavaDStream<String> textStream =
ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in HDFS
JavaDStream<String> suspectedStream = textStream.flatMap(new
FlatMapFunction<String,String>()
{
public Iterable<String> call(String line) throws
Exception {
//return Arrays.asList(line.toString().toString());
return
Lists.newArrayList(line.toString().toString());
}
});
suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){
public Void call(JavaRDD<String> rdd) throws Exception {
List<String> output = rdd.collect();
System.out.println("Sentences Collected from Flume " + output);
return null;
}
});
suspectedStream.print();
System.out.println("Welcome TO Flume Streaming");
ssc.start();
ssc.awaitTermination();
}
}
The command I use is:
./bin/spark-submit --verbose --jars
lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*]
--deploy-mode client --class xyz.Test1 bin/filestream3.jar
Regards,
Jeniba Johnson
________________________________
The contents of this e-mail and any attachment(s) may contain confidential or
privileged information for the intended recipient(s). Unintended recipients are
prohibited from taking action on the basis of information in this e-mail and
using or disseminating the information, and must notify the sender and delete
it from their system. L&T Infotech will not accept responsibility or liability
for the accuracy or completeness of, or the presence of any virus or disabling
code in this e-mail"