Hi, I am using Spark-1.0.0 over a 3 node cluster with 1 master and 2 slaves. I am trying to run LR algorithm over Spark Streaming.
package org.apache.spark.examples.streaming; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithSGD; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * Logistic regression based classification using ML Lib. */ public final class StreamingJavaLR { static int i = 1; // static LogisticRegressionModel model; // private static final Pattern SPACE = Pattern.compile(" "); static class ParsePoint implements Function<String, LabeledPoint> { private static final Pattern COMMA = Pattern.compile(","); private static final Pattern SPACE = Pattern.compile(" "); @Override public LabeledPoint call(String line) { String[] parts = COMMA.split(line); double y = Double.parseDouble(parts[0]); String[] tok = SPACE.split(parts[1]); double[] x = new double[tok.length]; for (int i = 0; i < tok.length; ++i) { x[i] = Double.parseDouble(tok[i]); } return new LabeledPoint(y, Vectors.dense(x)); } } // Edited static class ParsePointforInput implements Function<String, double[]> { private static final Pattern SPACE = Pattern.compile(" "); @Override public double[] call(String line) { String[] tok = SPACE.split(line); double[] x = new double[tok.length]; for (int i = 0; i < tok.length; ++i) { x[i] = Double.parseDouble(tok[i]); } return x; } } public static void main(String[] args) { if (args.length != 5) { System.err .println("Usage: JavaLR <master> <input_file_for_training> <step_size> <no_iters> <input_file_for_prediction>"); System.exit(1); } FileWriter file; PrintWriter outputFile = null; SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); Calendar cal=Calendar.getInstance(); final Date startTime; System.out.println("<<<<<Let's Print>>>>>"); // SparkConf conf = new SparkConf() // .setMaster(args[0]) // .setAppName("StreamingJavaLR") // .set("spark.cleaner.ttl", "1000") // .set("spark.executor.uri", "hdfs://192.168.145.191:9000/user/praveshj/spark/spark-0.9.1.tar.gz") // .setJars(JavaSparkContext.jarOfClass(StreamingJavaLR.class)); // // JavaSparkContext sc = new JavaSparkContext(conf); JavaSparkContext sc = new JavaSparkContext(args[0], "StreamingJavaLR", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(StreamingJavaLR.class)); System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Reading File"); JavaRDD<String> lines = sc.textFile(args[1]); System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File has been Read now mapping"); JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache(); System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Mapping Done"); double stepSize = Double.parseDouble(args[2]); int iterations = Integer.parseInt(args[3]); System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Read the arguments. stepSize = "+stepSize+" and iterations = "+iterations); BufferedReader br = null; System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Training the Model"); final LogisticRegressionModel model = LogisticRegressionWithSGD.train( points.rdd(), iterations, stepSize); System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Model Trained"); System.out.println("Final w: " + model.weights()); // printWeights(model.weights()); System.out.println("Intercept : " + model.intercept()); final Vector weightVector = model.weights(); // double[] weightArray = model.weights(); // // final DoubleMatrix weightMatrix = new DoubleMatrix(weightArray); sc.stop(); try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } // try { // file = new FileWriter(args[5]); // outputFile = new PrintWriter(file); // cal = Calendar.getInstance(); // cal.getTime(); //// startTime = sdf.format(cal.getTime()); // startTime = cal.getTime(); // outputFile.println("Start Time : " + startTime); // outputFile.flush(); // } catch (IOException E) { // E.printStackTrace(); // } // final JavaStreamingContext ssc = new JavaStreamingContext(sc, // new Duration(1000)); startTime = cal.getTime(); final JavaStreamingContext ssc = new JavaStreamingContext(args[0], "StreamingJavaLR", new Duration(1000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(StreamingJavaLR.class)); JavaDStream<String> lines_2 = ssc.textFileStream(args[4]); JavaDStream<double[]> points_2 = lines_2.map(new ParsePointforInput()); // points_2.print(); // System.out.print(lines_2.count()); // System.exit(0); points_2.foreachRDD(new Function<JavaRDD<double[]>, Void>() { @Override public Void call(JavaRDD rdd) { List<double[]> temp = rdd.collect(); //If no more data is left for Prediction, Stop the Program // if (rdd.count() == 0) // ssc.stop(); FileWriter newfile = null; BufferedWriter bw = null; try { newfile = new FileWriter( "/home/pravesh/data/abc" + i++ + ".txt"); bw = new BufferedWriter(newfile); } catch (IOException e) { e.printStackTrace(); } int inpNo = 0; double result; for (double[] dArray : temp) { double[][] dataArray = new double[1][2]; for (int i = 0; i < dArray.length; i++) dataArray[0][i] = dArray[i]; // DoubleMatrix dataMatrix = new DoubleMatrix(dataArray); // result = model.predictPoint(dataMatrix, weightMatrix, // model.intercept()); Vector dataVector = Vectors.dense(dArray); result = model.predictPoint(dataVector, weightVector, model.intercept()); try { Calendar cal2 = Calendar.getInstance(); // bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo + " (" + dataMatrix.get(0, 0) // + ", " + dataMatrix.get(0, 1) + ")" // + " belongs to : " + result + " and Start Time was " + startTime + "\n"); bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo + " (" + dataVector.toArray()[0] + ", " + dataVector.toArray()[1] + ")" + " belongs to : " + result + " and Start Time was " + startTime + "\n"); bw.flush(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // newoutputFile.flush(); inpNo++; } try { bw.close(); newfile.close(); } catch (IOException e) { e.printStackTrace(); } Void v = null; return v; } }); ssc.start(); ssc.awaitTermination(); // cal = Calendar.getInstance(); // outputFile.println(" End Time : " + cal.getTime()); // outputFile.flush(); System.exit(0); } } As you can see, I take input from files for training the model with JavaSparkContext and for testing the model with JavaStreamingContext. I have used the data given in $SPARK_HOME/mllib/data/lr-data/random.data for training and testing. To obtain larger data sets, I have copied this data. The code works fine for every possible set of data in local mode. Over the cluster, however, it is not able to process the file containing 0.4million entries. For every other data set (file with 0.8 million entries here), the output is like (Output after the StreamingContext is started): 14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113609-0001/0 on hostPort host-DSRV05.host.co.in:55206 with 8 cores, 512.0 MB RAM 14/06/06 11:36:09 INFO AppClient$ClientActor: Executor added: app-20140606113609-0001/1 on worker-20140606114445-host-DSRV04.host.co.in-39342 (host-DSRV04.host.co.in:39342) with 8 cores 14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113609-0001/1 on hostPort host-DSRV04.host.co.in:39342 with 8 cores, 512.0 MB RAM 14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated: app-20140606113609-0001/0 is now RUNNING 14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated: app-20140606113609-0001/1 is now RUNNING 14/06/06 11:36:09 INFO RecurringTimer: Started timer for JobGenerator at time 1402034770000 14/06/06 11:36:09 INFO JobGenerator: Started JobGenerator at 1402034770000 ms 14/06/06 11:36:09 INFO JobScheduler: Started JobScheduler 14/06/06 11:36:10 INFO FileInputDStream: Finding new files took 29 ms 14/06/06 11:36:10 INFO FileInputDStream: New files at time 1402034770000 ms: file:/newdisk1/praveshj/pravesh/data/input/testing8lk.txt 14/06/06 11:36:10 INFO MemoryStore: ensureFreeSpace(33216) called with curMem=0, maxMem=309225062 14/06/06 11:36:10 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.4 KB, free 294.9 MB) 14/06/06 11:36:10 INFO FileInputFormat: Total input paths to process : 1 14/06/06 11:36:10 INFO JobScheduler: Added jobs for time 1402034770000 ms 14/06/06 11:36:10 INFO JobScheduler: Starting job streaming job 1402034770000 ms.0 from job set of time 1402034770000 ms 14/06/06 11:36:10 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:10 INFO DAGScheduler: Got job 0 (collect at StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false) 14/06/06 11:36:10 INFO DAGScheduler: Final stage: Stage 0(collect at StreamingJavaLR.java:170) 14/06/06 11:36:10 INFO DAGScheduler: Parents of final stage: List() 14/06/06 11:36:10 INFO DAGScheduler: Missing parents: List() 14/06/06 11:36:10 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35), which has no missing parents 14/06/06 11:36:10 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35) 14/06/06 11:36:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@host-dsrv05.host.co.in:47657/user/Executor#-1277914179] with ID 0 14/06/06 11:36:10 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL) 14/06/06 11:36:10 INFO TaskSetManager: Serialized task 0.0:0 as 3544 bytes in 1 ms 14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@host-dsrv04.host.co.in:46975/user/Executor#1659982546] with ID 1 14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager host-DSRV05.host.co.in:52786 with 294.9 MB RAM 14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager host-DSRV04.host.co.in:42008 with 294.9 MB RAM 14/06/06 11:36:11 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:11 INFO FileInputDStream: New files at time 1402034771000 ms: 14/06/06 11:36:11 INFO JobScheduler: Added jobs for time 1402034771000 ms 14/06/06 11:36:12 INFO FileInputDStream: Finding new files took 1 ms 14/06/06 11:36:12 INFO FileInputDStream: New files at time 1402034772000 ms: 14/06/06 11:36:12 INFO JobScheduler: Added jobs for time 1402034772000 ms 14/06/06 11:36:13 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:13 INFO FileInputDStream: New files at time 1402034773000 ms: 14/06/06 11:36:13 INFO JobScheduler: Added jobs for time 1402034773000 ms 14/06/06 11:36:14 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:14 INFO FileInputDStream: New files at time 1402034774000 ms: 14/06/06 11:36:14 INFO JobScheduler: Added jobs for time 1402034774000 ms 14/06/06 11:36:15 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:15 INFO FileInputDStream: New files at time 1402034775000 ms: 14/06/06 11:36:15 INFO JobScheduler: Added jobs for time 1402034775000 ms 14/06/06 11:36:15 INFO BlockManagerInfo: Added taskresult_0 in memory on host-DSRV05.host.co.in:52786 (size: 19.9 MB, free: 275.0 MB) 14/06/06 11:36:15 INFO SendingConnection: Initiating connection to [host-DSRV05.host.co.in/192.168.145.195:52786] 14/06/06 11:36:15 INFO SendingConnection: Connected to [host-DSRV05.host.co.in/192.168.145.195:52786], 1 messages pending 14/06/06 11:36:15 INFO ConnectionManager: Accepted connection from [host-DSRV05.host.co.in/192.168.145.195] 14/06/06 11:36:15 INFO BlockManagerInfo: Removed taskresult_0 on host-DSRV05.host.co.in:52786 in memory (size: 19.9 MB, free: 294.9 MB) 14/06/06 11:36:15 INFO DAGScheduler: Completed ResultTask(0, 0) 14/06/06 11:36:15 INFO TaskSetManager: Finished TID 0 in 4961 ms on host-DSRV05.host.co.in (progress: 1/1) 14/06/06 11:36:15 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 14/06/06 11:36:15 INFO DAGScheduler: Stage 0 (collect at StreamingJavaLR.java:170) finished in 5.533 s 14/06/06 11:36:15 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 5.548644244 s 14/06/06 11:36:16 INFO FileInputDStream: Finding new files took 1 ms 14/06/06 11:36:16 INFO FileInputDStream: New files at time 1402034776000 ms: 14/06/06 11:36:16 INFO JobScheduler: Added jobs for time 1402034776000 ms 14/06/06 11:36:17 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:17 INFO FileInputDStream: New files at time 1402034777000 ms: 14/06/06 11:36:17 INFO JobScheduler: Added jobs for time 1402034777000 ms 14/06/06 11:36:18 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:18 INFO FileInputDStream: New files at time 1402034778000 ms: 14/06/06 11:36:18 INFO JobScheduler: Added jobs for time 1402034778000 ms 14/06/06 11:36:19 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:19 INFO FileInputDStream: New files at time 1402034779000 ms: 14/06/06 11:36:19 INFO JobScheduler: Added jobs for time 1402034779000 ms 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034770000 ms.0 from job set of time 1402034770000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 9.331 s for time 1402034770000 ms (execution: 9.274 s) 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 2.7293E-5 s 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034771000 ms.0 from job set of time 1402034771000 ms 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034771000 ms.0 from job set of time 1402034771000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 8.333 s for time 1402034771000 ms (execution: 0.000 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034772000 ms.0 from job set of time 1402034772000 ms 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.4859E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034772000 ms.0 from job set of time 1402034772000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 7.335 s for time 1402034772000 ms (execution: 0.002 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034773000 ms.0 from job set of time 1402034773000 ms 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.5294E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034773000 ms.0 from job set of time 1402034773000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 6.336 s for time 1402034773000 ms (execution: 0.001 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034774000 ms.0 from job set of time 1402034774000 ms 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.117E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034774000 ms.0 from job set of time 1402034774000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 5.337 s for time 1402034774000 ms (execution: 0.001 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034775000 ms.0 from job set of time 1402034775000 ms 14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were older than 1402034769000 ms: 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.1414E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034775000 ms.0 from job set of time 1402034775000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 4.338 s for time 1402034775000 ms (execution: 0.001 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034776000 ms.0 from job set of time 1402034776000 ms 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 4.2422E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034776000 ms.0 from job set of time 1402034776000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 3.338 s for time 1402034776000 ms (execution: 0.000 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034777000 ms.0 from job set of time 1402034777000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 3 from persistence list 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.1133E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034777000 ms.0 from job set of time 1402034777000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 2.339 s for time 1402034777000 ms (execution: 0.000 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034778000 ms.0 from job set of time 1402034778000 ms 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.124E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034778000 ms.0 from job set of time 1402034778000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 1.340 s for time 1402034778000 ms (execution: 0.001 s) 14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job 1402034779000 ms.0 from job set of time 1402034779000 ms 14/06/06 11:36:19 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:19 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 1.2101E-5 s 14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job 1402034779000 ms.0 from job set of time 1402034779000 ms 14/06/06 11:36:19 INFO JobScheduler: Total delay: 0.341 s for time 1402034779000 ms (execution: 0.001 s) 14/06/06 11:36:19 INFO BlockManager: Removing RDD 3 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 2 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 2 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 1 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 1 14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were older than 1402034770000 ms: 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 6 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 6 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 5 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 5 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 4 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 4 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034771000 ms: 1402034770000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 9 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 9 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 8 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 8 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 7 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 7 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034772000 ms: 1402034771000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 12 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 12 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 11 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 11 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 10 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 10 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034773000 ms: 1402034772000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 15 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 15 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 14 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 14 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 13 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 13 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034774000 ms: 1402034773000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 18 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 18 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 17 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 17 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 16 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 16 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034775000 ms: 1402034774000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 21 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 21 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 20 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 20 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 19 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 19 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034776000 ms: 1402034775000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 24 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 24 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 23 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 23 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 22 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 22 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034777000 ms: 1402034776000 ms 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 27 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 27 14/06/06 11:36:19 INFO MappedRDD: Removing RDD 26 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 26 14/06/06 11:36:19 INFO UnionRDD: Removing RDD 25 from persistence list 14/06/06 11:36:19 INFO BlockManager: Removing RDD 25 14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were older than 1402034778000 ms: 1402034777000 ms 14/06/06 11:36:20 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:20 INFO FileInputDStream: New files at time 1402034780000 ms: 14/06/06 11:36:20 INFO JobScheduler: Added jobs for time 1402034780000 ms 14/06/06 11:36:20 INFO JobScheduler: Starting job streaming job 1402034780000 ms.0 from job set of time 1402034780000 ms 14/06/06 11:36:20 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:20 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 2.8574E-5 s 14/06/06 11:36:20 INFO JobScheduler: Finished job streaming job 1402034780000 ms.0 from job set of time 1402034780000 ms 14/06/06 11:36:20 INFO MappedRDD: Removing RDD 30 from persistence list 14/06/06 11:36:20 INFO JobScheduler: Total delay: 0.006 s for time 1402034780000 ms (execution: 0.002 s) 14/06/06 11:36:20 INFO BlockManager: Removing RDD 30 14/06/06 11:36:20 INFO MappedRDD: Removing RDD 29 from persistence list 14/06/06 11:36:20 INFO BlockManager: Removing RDD 29 14/06/06 11:36:20 INFO UnionRDD: Removing RDD 28 from persistence list 14/06/06 11:36:20 INFO BlockManager: Removing RDD 28 14/06/06 11:36:20 INFO FileInputDStream: Cleared 1 old files that were older than 1402034779000 ms: 1402034778000 ms 14/06/06 11:36:21 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:21 INFO FileInputDStream: New files at time 1402034781000 ms: 14/06/06 11:36:21 INFO JobScheduler: Added jobs for time 1402034781000 ms 14/06/06 11:36:21 INFO JobScheduler: Starting job streaming job 1402034781000 ms.0 from job set of time 1402034781000 ms 14/06/06 11:36:21 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:21 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 3.023E-5 s 14/06/06 11:36:21 INFO JobScheduler: Finished job streaming job 1402034781000 ms.0 from job set of time 1402034781000 ms 14/06/06 11:36:21 INFO MappedRDD: Removing RDD 33 from persistence list 14/06/06 11:36:21 INFO JobScheduler: Total delay: 0.006 s for time 1402034781000 ms (execution: 0.002 s) 14/06/06 11:36:21 INFO BlockManager: Removing RDD 33 14/06/06 11:36:21 INFO MappedRDD: Removing RDD 32 from persistence list 14/06/06 11:36:21 INFO BlockManager: Removing RDD 32 14/06/06 11:36:21 INFO UnionRDD: Removing RDD 31 from persistence list 14/06/06 11:36:21 INFO BlockManager: Removing RDD 31 14/06/06 11:36:21 INFO FileInputDStream: Cleared 1 old files that were older than 1402034780000 ms: 1402034779000 ms 14/06/06 11:36:22 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:36:22 INFO FileInputDStream: New files at time 1402034782000 ms: 14/06/06 11:36:22 INFO JobScheduler: Added jobs for time 1402034782000 ms 14/06/06 11:36:22 INFO JobScheduler: Starting job streaming job 1402034782000 ms.0 from job set of time 1402034782000 ms 14/06/06 11:36:22 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:36:22 INFO SparkContext: Job finished: collect at StreamingJavaLR.java:170, took 3.9897E-5 s 14/06/06 11:36:22 INFO JobScheduler: Finished job streaming job 1402034782000 ms.0 from job set of time 1402034782000 ms 14/06/06 11:36:22 INFO MappedRDD: Removing RDD 36 from persistence list 14/06/06 11:36:22 INFO JobScheduler: Total delay: 0.006 s for time 1402034782000 ms (execution: 0.002 s) 14/06/06 11:36:22 INFO BlockManager: Removing RDD 36 14/06/06 11:36:22 INFO MappedRDD: Removing RDD 35 from persistence list 14/06/06 11:36:22 INFO BlockManager: Removing RDD 35 14/06/06 11:36:22 INFO UnionRDD: Removing RDD 34 from persistence list 14/06/06 11:36:22 INFO BlockManager: Removing RDD 34 14/06/06 11:36:22 INFO FileInputDStream: Cleared 1 old files that were older than 1402034781000 ms: 1402034780000 ms For file with 0.4 million entries, the ouput is (Output after StreamingContext is started) : 14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added: app-20140606113855-0003/0 on worker-20140606114445-host-DSRV05.host.co.in-55206 (host-DSRV05.host.co.in:55206) with 8 cores 14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113855-0003/0 on hostPort host-DSRV05.host.co.in:55206 with 8 cores, 512.0 MB RAM 14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added: app-20140606113855-0003/1 on worker-20140606114445-host-DSRV04.host.co.in-39342 (host-DSRV04.host.co.in:39342) with 8 cores 14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140606113855-0003/1 on hostPort host-DSRV04.host.co.in:39342 with 8 cores, 512.0 MB RAM 14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated: app-20140606113855-0003/0 is now RUNNING 14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated: app-20140606113855-0003/1 is now RUNNING 14/06/06 11:38:55 INFO RecurringTimer: Started timer for JobGenerator at time 1402034936000 14/06/06 11:38:55 INFO JobGenerator: Started JobGenerator at 1402034936000 ms 14/06/06 11:38:55 INFO JobScheduler: Started JobScheduler 14/06/06 11:38:56 INFO FileInputDStream: Finding new files took 31 ms 14/06/06 11:38:56 INFO FileInputDStream: New files at time 1402034936000 ms: file:/newdisk1/praveshj/pravesh/data/input/testing4lk.txt 14/06/06 11:38:56 INFO MemoryStore: ensureFreeSpace(33216) called with curMem=0, maxMem=309225062 14/06/06 11:38:56 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.4 KB, free 294.9 MB) 14/06/06 11:38:56 INFO FileInputFormat: Total input paths to process : 1 14/06/06 11:38:56 INFO JobScheduler: Added jobs for time 1402034936000 ms 14/06/06 11:38:56 INFO JobScheduler: Starting job streaming job 1402034936000 ms.0 from job set of time 1402034936000 ms 14/06/06 11:38:56 INFO SparkContext: Starting job: collect at StreamingJavaLR.java:170 14/06/06 11:38:56 INFO DAGScheduler: Got job 0 (collect at StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false) 14/06/06 11:38:56 INFO DAGScheduler: Final stage: Stage 0(collect at StreamingJavaLR.java:170) 14/06/06 11:38:56 INFO DAGScheduler: Parents of final stage: List() 14/06/06 11:38:56 INFO DAGScheduler: Missing parents: List() 14/06/06 11:38:56 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35), which has no missing parents 14/06/06 11:38:56 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35) 14/06/06 11:38:56 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/06/06 11:38:57 INFO FileInputDStream: Finding new files took 1 ms 14/06/06 11:38:57 INFO FileInputDStream: New files at time 1402034937000 ms: 14/06/06 11:38:57 INFO JobScheduler: Added jobs for time 1402034937000 ms 14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@host-dsrv05.host.co.in:39424/user/Executor#-500165450] with ID 0 14/06/06 11:38:57 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL) 14/06/06 11:38:57 INFO TaskSetManager: Serialized task 0.0:0 as 3544 bytes in 0 ms 14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@host-dsrv04.host.co.in:45532/user/Executor#1654371091] with ID 1 14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager host-DSRV05.host.co.in:53857 with 294.9 MB RAM 14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager host-DSRV04.host.co.in:38057 with 294.9 MB RAM 14/06/06 11:38:58 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:38:58 INFO FileInputDStream: New files at time 1402034938000 ms: 14/06/06 11:38:58 INFO JobScheduler: Added jobs for time 1402034938000 ms 14/06/06 11:38:59 INFO FileInputDStream: Finding new files took 1 ms 14/06/06 11:38:59 INFO FileInputDStream: New files at time 1402034939000 ms: 14/06/06 11:38:59 INFO JobScheduler: Added jobs for time 1402034939000 ms 14/06/06 11:39:00 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:00 INFO FileInputDStream: New files at time 1402034940000 ms: 14/06/06 11:39:00 INFO JobScheduler: Added jobs for time 1402034940000 ms 14/06/06 11:39:01 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:01 INFO FileInputDStream: New files at time 1402034941000 ms: 14/06/06 11:39:01 INFO JobScheduler: Added jobs for time 1402034941000 ms 14/06/06 11:39:02 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:02 INFO FileInputDStream: New files at time 1402034942000 ms: 14/06/06 11:39:02 INFO JobScheduler: Added jobs for time 1402034942000 ms 14/06/06 11:39:03 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:03 INFO FileInputDStream: New files at time 1402034943000 ms: 14/06/06 11:39:03 INFO JobScheduler: Added jobs for time 1402034943000 ms 14/06/06 11:39:04 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:04 INFO FileInputDStream: New files at time 1402034944000 ms: 14/06/06 11:39:04 INFO JobScheduler: Added jobs for time 1402034944000 ms 14/06/06 11:39:05 INFO FileInputDStream: Finding new files took 1 ms 14/06/06 11:39:05 INFO FileInputDStream: New files at time 1402034945000 ms: 14/06/06 11:39:05 INFO JobScheduler: Added jobs for time 1402034945000 ms 14/06/06 11:39:06 INFO FileInputDStream: Finding new files took 1 ms 14/06/06 11:39:06 INFO FileInputDStream: New files at time 1402034946000 ms: 14/06/06 11:39:06 INFO JobScheduler: Added jobs for time 1402034946000 ms 14/06/06 11:39:07 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:07 INFO FileInputDStream: New files at time 1402034947000 ms: 14/06/06 11:39:07 INFO JobScheduler: Added jobs for time 1402034947000 ms 14/06/06 11:39:08 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:08 INFO FileInputDStream: New files at time 1402034948000 ms: 14/06/06 11:39:08 INFO JobScheduler: Added jobs for time 1402034948000 ms 14/06/06 11:39:09 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:09 INFO FileInputDStream: New files at time 1402034949000 ms: 14/06/06 11:39:09 INFO JobScheduler: Added jobs for time 1402034949000 ms 14/06/06 11:39:10 INFO FileInputDStream: Finding new files took 0 ms 14/06/06 11:39:10 INFO FileInputDStream: New files at time 1402034950000 ms: and this goes on forever. It doesn't print the output in the file it is supposed to. The worker logs don't output anything different. Any idea what might be the issue? -- Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7110.html Sent from the Apache Spark User List mailing list archive at Nabble.com.