Hi,
I am using Spark-1.0.0 over a 3 node cluster with 1 master and 2 slaves. I
am trying to run LR algorithm over Spark Streaming.
package org.apache.spark.examples.streaming;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Logistic regression based classification using ML Lib.
*/
public final class StreamingJavaLR {
static int i = 1;
// static LogisticRegressionModel model;
// private static final Pattern SPACE = Pattern.compile(" ");
static class ParsePoint implements Function<String, LabeledPoint> {
private static final Pattern COMMA = Pattern.compile(",");
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public LabeledPoint call(String line) {
String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
String[] tok = SPACE.split(parts[1]);
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, Vectors.dense(x));
}
}
// Edited
static class ParsePointforInput implements Function<String, double[]> {
private static final Pattern SPACE = Pattern.compile(" ");
@Override
public double[] call(String line) {
String[] tok = SPACE.split(line);
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return x;
}
}
public static void main(String[] args) {
if (args.length != 5) {
System.err
.println("Usage: JavaLR <master>
<input_file_for_training>
<step_size> <no_iters> <input_file_for_prediction>");
System.exit(1);
}
FileWriter file;
PrintWriter outputFile = null;
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
Calendar cal=Calendar.getInstance();
final Date startTime;
System.out.println("<<<<<Let's Print>>>>>");
// SparkConf conf = new SparkConf()
// .setMaster(args[0])
// .setAppName("StreamingJavaLR")
// .set("spark.cleaner.ttl", "1000")
// .set("spark.executor.uri",
"hdfs://192.168.145.191:9000/user/praveshj/spark/spark-0.9.1.tar.gz")
//
.setJars(JavaSparkContext.jarOfClass(StreamingJavaLR.class));
//
// JavaSparkContext sc = new JavaSparkContext(conf);
JavaSparkContext sc = new JavaSparkContext(args[0],
"StreamingJavaLR",
System.getenv("SPARK_HOME"),
JavaSparkContext.jarOfClass(StreamingJavaLR.class));
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Reading
File");
JavaRDD<String> lines = sc.textFile(args[1]);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File has
been Read now mapping");
JavaRDD<LabeledPoint> points = lines.map(new
ParsePoint()).cache();
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Mapping
Done");
double stepSize = Double.parseDouble(args[2]);
int iterations = Integer.parseInt(args[3]);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Read the
arguments. stepSize = "+stepSize+" and iterations = "+iterations);
BufferedReader br = null;
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Training the
Model");
final LogisticRegressionModel model =
LogisticRegressionWithSGD.train(
points.rdd(), iterations, stepSize);
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Model
Trained");
System.out.println("Final w: " + model.weights());
// printWeights(model.weights());
System.out.println("Intercept : " + model.intercept());
final Vector weightVector = model.weights();
// double[] weightArray = model.weights();
//
// final DoubleMatrix weightMatrix = new DoubleMatrix(weightArray);
sc.stop();
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// try {
// file = new FileWriter(args[5]);
// outputFile = new PrintWriter(file);
// cal = Calendar.getInstance();
// cal.getTime();
//// startTime = sdf.format(cal.getTime());
// startTime = cal.getTime();
// outputFile.println("Start Time : " + startTime);
// outputFile.flush();
// } catch (IOException E) {
// E.printStackTrace();
// }
// final JavaStreamingContext ssc = new JavaStreamingContext(sc,
// new Duration(1000));
startTime = cal.getTime();
final JavaStreamingContext ssc = new
JavaStreamingContext(args[0],
"StreamingJavaLR", new Duration(1000),
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(StreamingJavaLR.class));
JavaDStream<String> lines_2 = ssc.textFileStream(args[4]);
JavaDStream<double[]> points_2 = lines_2.map(new
ParsePointforInput());
// points_2.print();
// System.out.print(lines_2.count());
// System.exit(0);
points_2.foreachRDD(new Function<JavaRDD<double[]>, Void>() {
@Override
public Void call(JavaRDD rdd) {
List<double[]> temp = rdd.collect();
//If no more data is left for Prediction, Stop
the Program
// if (rdd.count() == 0)
// ssc.stop();
FileWriter newfile = null;
BufferedWriter bw = null;
try {
newfile = new FileWriter(
"/home/pravesh/data/abc"
+ i++ +
".txt");
bw = new BufferedWriter(newfile);
} catch (IOException e) {
e.printStackTrace();
}
int inpNo = 0;
double result;
for (double[] dArray : temp) {
double[][] dataArray = new double[1][2];
for (int i = 0; i < dArray.length; i++)
dataArray[0][i] = dArray[i];
// DoubleMatrix dataMatrix = new
DoubleMatrix(dataArray);
// result = model.predictPoint(dataMatrix,
weightMatrix,
// model.intercept());
Vector dataVector =
Vectors.dense(dArray);
result = model.predictPoint(dataVector,
weightVector,
model.intercept());
try {
Calendar cal2 =
Calendar.getInstance();
// bw.write("INFO at " +
cal2.getTime() + " : " + "Point " + inpNo
+ " (" + dataMatrix.get(0, 0)
// + ", " +
dataMatrix.get(0, 1) + ")"
// + " belongs to
: " + result + " and Start Time was " +
startTime + "\n");
bw.write("INFO at " +
cal2.getTime() + " : " + "Point " + inpNo +
" (" + dataVector.toArray()[0]
+ ", " +
dataVector.toArray()[1] + ")"
+ " belongs to
: " + result + " and Start Time was " + startTime
+ "\n");
bw.flush();
} catch (IOException e) {
// TODO Auto-generated catch
block
e.printStackTrace();
}
// newoutputFile.flush();
inpNo++;
}
try {
bw.close();
newfile.close();
} catch (IOException e) {
e.printStackTrace();
}
Void v = null;
return v;
}
});
ssc.start();
ssc.awaitTermination();
// cal = Calendar.getInstance();
// outputFile.println(" End Time : " + cal.getTime());
// outputFile.flush();
System.exit(0);
}
}
As you can see, I take input from files for training the model with
JavaSparkContext and for testing the model with JavaStreamingContext.
I have used the data given in $SPARK_HOME/mllib/data/lr-data/random.data for
training and testing. To obtain larger data sets, I have copied this data.
The code works fine for every possible set of data in local mode. Over the
cluster, however, it is not able to process the file containing 0.4million
entries.
For every other data set (file with 0.8 million entries here), the output is
like (Output after the StreamingContext is started):
14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113609-0001/0 on hostPort host-DSRV05.host.co.in:55206 with 8
cores, 512.0 MB RAM
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor added:
app-20140606113609-0001/1 on
worker-20140606114445-host-DSRV04.host.co.in-39342
(host-DSRV04.host.co.in:39342) with 8 cores
14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113609-0001/1 on hostPort host-DSRV04.host.co.in:39342 with 8
cores, 512.0 MB RAM
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated:
app-20140606113609-0001/0 is now RUNNING
14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated:
app-20140606113609-0001/1 is now RUNNING
14/06/06 11:36:09 INFO RecurringTimer: Started timer for JobGenerator at
time 1402034770000
14/06/06 11:36:09 INFO JobGenerator: Started JobGenerator at
1402034770000 ms
14/06/06 11:36:09 INFO JobScheduler: Started JobScheduler
14/06/06 11:36:10 INFO FileInputDStream: Finding new files took 29 ms
14/06/06 11:36:10 INFO FileInputDStream: New files at time 1402034770000
ms:
file:/newdisk1/praveshj/pravesh/data/input/testing8lk.txt
14/06/06 11:36:10 INFO MemoryStore: ensureFreeSpace(33216) called with
curMem=0, maxMem=309225062
14/06/06 11:36:10 INFO MemoryStore: Block broadcast_0 stored as values
to memory (estimated size 32.4 KB, free 294.9 MB)
14/06/06 11:36:10 INFO FileInputFormat: Total input paths to process : 1
14/06/06 11:36:10 INFO JobScheduler: Added jobs for time 1402034770000
ms
14/06/06 11:36:10 INFO JobScheduler: Starting job streaming job
1402034770000 ms.0 from job set of time 1402034770000 ms
14/06/06 11:36:10 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:10 INFO DAGScheduler: Got job 0 (collect at
StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
14/06/06 11:36:10 INFO DAGScheduler: Final stage: Stage 0(collect at
StreamingJavaLR.java:170)
14/06/06 11:36:10 INFO DAGScheduler: Parents of final stage: List()
14/06/06 11:36:10 INFO DAGScheduler: Missing parents: List()
14/06/06 11:36:10 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
map at MappedDStream.scala:35), which has no missing parents
14/06/06 11:36:10 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
14/06/06 11:36:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1
tasks
14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://[email protected]:47657/user/Executor#-1277914179]
with ID 0
14/06/06 11:36:10 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
14/06/06 11:36:10 INFO TaskSetManager: Serialized task 0.0:0 as 3544
bytes in 1 ms
14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://[email protected]:46975/user/Executor#1659982546]
with ID 1
14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager
host-DSRV05.host.co.in:52786 with 294.9 MB RAM
14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager
host-DSRV04.host.co.in:42008 with 294.9 MB RAM
14/06/06 11:36:11 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:11 INFO FileInputDStream: New files at time 1402034771000
ms:
14/06/06 11:36:11 INFO JobScheduler: Added jobs for time 1402034771000
ms
14/06/06 11:36:12 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:36:12 INFO FileInputDStream: New files at time 1402034772000
ms:
14/06/06 11:36:12 INFO JobScheduler: Added jobs for time 1402034772000
ms
14/06/06 11:36:13 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:13 INFO FileInputDStream: New files at time 1402034773000
ms:
14/06/06 11:36:13 INFO JobScheduler: Added jobs for time 1402034773000
ms
14/06/06 11:36:14 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:14 INFO FileInputDStream: New files at time 1402034774000
ms:
14/06/06 11:36:14 INFO JobScheduler: Added jobs for time 1402034774000
ms
14/06/06 11:36:15 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:15 INFO FileInputDStream: New files at time 1402034775000
ms:
14/06/06 11:36:15 INFO JobScheduler: Added jobs for time 1402034775000
ms
14/06/06 11:36:15 INFO BlockManagerInfo: Added taskresult_0 in memory on
host-DSRV05.host.co.in:52786 (size: 19.9 MB, free: 275.0 MB)
14/06/06 11:36:15 INFO SendingConnection: Initiating connection to
[host-DSRV05.host.co.in/192.168.145.195:52786]
14/06/06 11:36:15 INFO SendingConnection: Connected to
[host-DSRV05.host.co.in/192.168.145.195:52786], 1 messages pending
14/06/06 11:36:15 INFO ConnectionManager: Accepted connection from
[host-DSRV05.host.co.in/192.168.145.195]
14/06/06 11:36:15 INFO BlockManagerInfo: Removed taskresult_0 on
host-DSRV05.host.co.in:52786 in memory (size: 19.9 MB, free: 294.9 MB)
14/06/06 11:36:15 INFO DAGScheduler: Completed ResultTask(0, 0)
14/06/06 11:36:15 INFO TaskSetManager: Finished TID 0 in 4961 ms on
host-DSRV05.host.co.in (progress: 1/1)
14/06/06 11:36:15 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks have all completed, from pool
14/06/06 11:36:15 INFO DAGScheduler: Stage 0 (collect at
StreamingJavaLR.java:170) finished in 5.533 s
14/06/06 11:36:15 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 5.548644244 s
14/06/06 11:36:16 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:36:16 INFO FileInputDStream: New files at time 1402034776000
ms:
14/06/06 11:36:16 INFO JobScheduler: Added jobs for time 1402034776000
ms
14/06/06 11:36:17 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:17 INFO FileInputDStream: New files at time 1402034777000
ms:
14/06/06 11:36:17 INFO JobScheduler: Added jobs for time 1402034777000
ms
14/06/06 11:36:18 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:18 INFO FileInputDStream: New files at time 1402034778000
ms:
14/06/06 11:36:18 INFO JobScheduler: Added jobs for time 1402034778000
ms
14/06/06 11:36:19 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:19 INFO FileInputDStream: New files at time 1402034779000
ms:
14/06/06 11:36:19 INFO JobScheduler: Added jobs for time 1402034779000
ms
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034770000 ms.0 from job set of time 1402034770000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 9.331 s for time
1402034770000 ms (execution: 9.274 s)
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 2.7293E-5 s
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034771000 ms.0 from job set of time 1402034771000 ms
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034771000 ms.0 from job set of time 1402034771000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 8.333 s for time
1402034771000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034772000 ms.0 from job set of time 1402034772000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.4859E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034772000 ms.0 from job set of time 1402034772000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 7.335 s for time
1402034772000 ms (execution: 0.002 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034773000 ms.0 from job set of time 1402034773000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.5294E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034773000 ms.0 from job set of time 1402034773000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 6.336 s for time
1402034773000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034774000 ms.0 from job set of time 1402034774000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.117E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034774000 ms.0 from job set of time 1402034774000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 5.337 s for time
1402034774000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034775000 ms.0 from job set of time 1402034775000 ms
14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were
older than 1402034769000 ms:
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.1414E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034775000 ms.0 from job set of time 1402034775000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 4.338 s for time
1402034775000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034776000 ms.0 from job set of time 1402034776000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 4.2422E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034776000 ms.0 from job set of time 1402034776000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 3.338 s for time
1402034776000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034777000 ms.0 from job set of time 1402034777000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 3 from persistence list
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.1133E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034777000 ms.0 from job set of time 1402034777000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 2.339 s for time
1402034777000 ms (execution: 0.000 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034778000 ms.0 from job set of time 1402034778000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.124E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034778000 ms.0 from job set of time 1402034778000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 1.340 s for time
1402034778000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034779000 ms.0 from job set of time 1402034779000 ms
14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.2101E-5 s
14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034779000 ms.0 from job set of time 1402034779000 ms
14/06/06 11:36:19 INFO JobScheduler: Total delay: 0.341 s for time
1402034779000 ms (execution: 0.001 s)
14/06/06 11:36:19 INFO BlockManager: Removing RDD 3
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 2 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 2
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 1 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 1
14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were
older than 1402034770000 ms:
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 6 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 6
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 5 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 5
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 4 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 4
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034771000 ms: 1402034770000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 9 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 9
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 8 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 8
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 7 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 7
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034772000 ms: 1402034771000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 12 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 12
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 11 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 11
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 10 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 10
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034773000 ms: 1402034772000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 15 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 15
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 14 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 14
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 13 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 13
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034774000 ms: 1402034773000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 18 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 18
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 17 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 17
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 16 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 16
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034775000 ms: 1402034774000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 21 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 21
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 20 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 20
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 19 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 19
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034776000 ms: 1402034775000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 24 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 24
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 23 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 23
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 22 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 22
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034777000 ms: 1402034776000 ms
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 27 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 27
14/06/06 11:36:19 INFO MappedRDD: Removing RDD 26 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 26
14/06/06 11:36:19 INFO UnionRDD: Removing RDD 25 from persistence list
14/06/06 11:36:19 INFO BlockManager: Removing RDD 25
14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034778000 ms: 1402034777000 ms
14/06/06 11:36:20 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:20 INFO FileInputDStream: New files at time 1402034780000
ms:
14/06/06 11:36:20 INFO JobScheduler: Added jobs for time 1402034780000
ms
14/06/06 11:36:20 INFO JobScheduler: Starting job streaming job
1402034780000 ms.0 from job set of time 1402034780000 ms
14/06/06 11:36:20 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:20 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 2.8574E-5 s
14/06/06 11:36:20 INFO JobScheduler: Finished job streaming job
1402034780000 ms.0 from job set of time 1402034780000 ms
14/06/06 11:36:20 INFO MappedRDD: Removing RDD 30 from persistence list
14/06/06 11:36:20 INFO JobScheduler: Total delay: 0.006 s for time
1402034780000 ms (execution: 0.002 s)
14/06/06 11:36:20 INFO BlockManager: Removing RDD 30
14/06/06 11:36:20 INFO MappedRDD: Removing RDD 29 from persistence list
14/06/06 11:36:20 INFO BlockManager: Removing RDD 29
14/06/06 11:36:20 INFO UnionRDD: Removing RDD 28 from persistence list
14/06/06 11:36:20 INFO BlockManager: Removing RDD 28
14/06/06 11:36:20 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034779000 ms: 1402034778000 ms
14/06/06 11:36:21 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:21 INFO FileInputDStream: New files at time 1402034781000
ms:
14/06/06 11:36:21 INFO JobScheduler: Added jobs for time 1402034781000
ms
14/06/06 11:36:21 INFO JobScheduler: Starting job streaming job
1402034781000 ms.0 from job set of time 1402034781000 ms
14/06/06 11:36:21 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:21 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 3.023E-5 s
14/06/06 11:36:21 INFO JobScheduler: Finished job streaming job
1402034781000 ms.0 from job set of time 1402034781000 ms
14/06/06 11:36:21 INFO MappedRDD: Removing RDD 33 from persistence list
14/06/06 11:36:21 INFO JobScheduler: Total delay: 0.006 s for time
1402034781000 ms (execution: 0.002 s)
14/06/06 11:36:21 INFO BlockManager: Removing RDD 33
14/06/06 11:36:21 INFO MappedRDD: Removing RDD 32 from persistence list
14/06/06 11:36:21 INFO BlockManager: Removing RDD 32
14/06/06 11:36:21 INFO UnionRDD: Removing RDD 31 from persistence list
14/06/06 11:36:21 INFO BlockManager: Removing RDD 31
14/06/06 11:36:21 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034780000 ms: 1402034779000 ms
14/06/06 11:36:22 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:36:22 INFO FileInputDStream: New files at time 1402034782000
ms:
14/06/06 11:36:22 INFO JobScheduler: Added jobs for time 1402034782000
ms
14/06/06 11:36:22 INFO JobScheduler: Starting job streaming job
1402034782000 ms.0 from job set of time 1402034782000 ms
14/06/06 11:36:22 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:36:22 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 3.9897E-5 s
14/06/06 11:36:22 INFO JobScheduler: Finished job streaming job
1402034782000 ms.0 from job set of time 1402034782000 ms
14/06/06 11:36:22 INFO MappedRDD: Removing RDD 36 from persistence list
14/06/06 11:36:22 INFO JobScheduler: Total delay: 0.006 s for time
1402034782000 ms (execution: 0.002 s)
14/06/06 11:36:22 INFO BlockManager: Removing RDD 36
14/06/06 11:36:22 INFO MappedRDD: Removing RDD 35 from persistence list
14/06/06 11:36:22 INFO BlockManager: Removing RDD 35
14/06/06 11:36:22 INFO UnionRDD: Removing RDD 34 from persistence list
14/06/06 11:36:22 INFO BlockManager: Removing RDD 34
14/06/06 11:36:22 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034781000 ms: 1402034780000 ms
For file with 0.4 million entries, the ouput is (Output after
StreamingContext is started) :
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added:
app-20140606113855-0003/0 on
worker-20140606114445-host-DSRV05.host.co.in-55206
(host-DSRV05.host.co.in:55206) with 8 cores
14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113855-0003/0 on hostPort host-DSRV05.host.co.in:55206 with 8
cores, 512.0 MB RAM
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added:
app-20140606113855-0003/1 on
worker-20140606114445-host-DSRV04.host.co.in-39342
(host-DSRV04.host.co.in:39342) with 8 cores
14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113855-0003/1 on hostPort host-DSRV04.host.co.in:39342 with 8
cores, 512.0 MB RAM
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated:
app-20140606113855-0003/0 is now RUNNING
14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated:
app-20140606113855-0003/1 is now RUNNING
14/06/06 11:38:55 INFO RecurringTimer: Started timer for JobGenerator at
time 1402034936000
14/06/06 11:38:55 INFO JobGenerator: Started JobGenerator at
1402034936000 ms
14/06/06 11:38:55 INFO JobScheduler: Started JobScheduler
14/06/06 11:38:56 INFO FileInputDStream: Finding new files took 31 ms
14/06/06 11:38:56 INFO FileInputDStream: New files at time 1402034936000
ms:
file:/newdisk1/praveshj/pravesh/data/input/testing4lk.txt
14/06/06 11:38:56 INFO MemoryStore: ensureFreeSpace(33216) called with
curMem=0, maxMem=309225062
14/06/06 11:38:56 INFO MemoryStore: Block broadcast_0 stored as values
to memory (estimated size 32.4 KB, free 294.9 MB)
14/06/06 11:38:56 INFO FileInputFormat: Total input paths to process : 1
14/06/06 11:38:56 INFO JobScheduler: Added jobs for time 1402034936000
ms
14/06/06 11:38:56 INFO JobScheduler: Starting job streaming job
1402034936000 ms.0 from job set of time 1402034936000 ms
14/06/06 11:38:56 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
14/06/06 11:38:56 INFO DAGScheduler: Got job 0 (collect at
StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
14/06/06 11:38:56 INFO DAGScheduler: Final stage: Stage 0(collect at
StreamingJavaLR.java:170)
14/06/06 11:38:56 INFO DAGScheduler: Parents of final stage: List()
14/06/06 11:38:56 INFO DAGScheduler: Missing parents: List()
14/06/06 11:38:56 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
map at MappedDStream.scala:35), which has no missing parents
14/06/06 11:38:56 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
14/06/06 11:38:56 INFO TaskSchedulerImpl: Adding task set 0.0 with 1
tasks
14/06/06 11:38:57 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:38:57 INFO FileInputDStream: New files at time 1402034937000
ms:
14/06/06 11:38:57 INFO JobScheduler: Added jobs for time 1402034937000
ms
14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://[email protected]:39424/user/Executor#-500165450]
with ID 0
14/06/06 11:38:57 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
14/06/06 11:38:57 INFO TaskSetManager: Serialized task 0.0:0 as 3544
bytes in 0 ms
14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://[email protected]:45532/user/Executor#1654371091]
with ID 1
14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager
host-DSRV05.host.co.in:53857 with 294.9 MB RAM
14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager
host-DSRV04.host.co.in:38057 with 294.9 MB RAM
14/06/06 11:38:58 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:38:58 INFO FileInputDStream: New files at time 1402034938000
ms:
14/06/06 11:38:58 INFO JobScheduler: Added jobs for time 1402034938000
ms
14/06/06 11:38:59 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:38:59 INFO FileInputDStream: New files at time 1402034939000
ms:
14/06/06 11:38:59 INFO JobScheduler: Added jobs for time 1402034939000
ms
14/06/06 11:39:00 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:00 INFO FileInputDStream: New files at time 1402034940000
ms:
14/06/06 11:39:00 INFO JobScheduler: Added jobs for time 1402034940000
ms
14/06/06 11:39:01 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:01 INFO FileInputDStream: New files at time 1402034941000
ms:
14/06/06 11:39:01 INFO JobScheduler: Added jobs for time 1402034941000
ms
14/06/06 11:39:02 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:02 INFO FileInputDStream: New files at time 1402034942000
ms:
14/06/06 11:39:02 INFO JobScheduler: Added jobs for time 1402034942000
ms
14/06/06 11:39:03 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:03 INFO FileInputDStream: New files at time 1402034943000
ms:
14/06/06 11:39:03 INFO JobScheduler: Added jobs for time 1402034943000
ms
14/06/06 11:39:04 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:04 INFO FileInputDStream: New files at time 1402034944000
ms:
14/06/06 11:39:04 INFO JobScheduler: Added jobs for time 1402034944000
ms
14/06/06 11:39:05 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:39:05 INFO FileInputDStream: New files at time 1402034945000
ms:
14/06/06 11:39:05 INFO JobScheduler: Added jobs for time 1402034945000
ms
14/06/06 11:39:06 INFO FileInputDStream: Finding new files took 1 ms
14/06/06 11:39:06 INFO FileInputDStream: New files at time 1402034946000
ms:
14/06/06 11:39:06 INFO JobScheduler: Added jobs for time 1402034946000
ms
14/06/06 11:39:07 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:07 INFO FileInputDStream: New files at time 1402034947000
ms:
14/06/06 11:39:07 INFO JobScheduler: Added jobs for time 1402034947000
ms
14/06/06 11:39:08 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:08 INFO FileInputDStream: New files at time 1402034948000
ms:
14/06/06 11:39:08 INFO JobScheduler: Added jobs for time 1402034948000
ms
14/06/06 11:39:09 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:09 INFO FileInputDStream: New files at time 1402034949000
ms:
14/06/06 11:39:09 INFO JobScheduler: Added jobs for time 1402034949000
ms
14/06/06 11:39:10 INFO FileInputDStream: Finding new files took 0 ms
14/06/06 11:39:10 INFO FileInputDStream: New files at time 1402034950000
ms:
and this goes on forever. It doesn't print the output in the file it is
supposed to.
The worker logs don't output anything different.
Any idea what might be the issue?
--
Thanks
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.