Hi,
(Apologies for the long mail, but it's necessary to provide sufficient
details considering the number of issues faced.)
I'm running into issues testing LogisticRegressionWithSGD a two node
cluster (each node with 24 cores and 16G available to slaves out of 24G on
the system). Here's a description of the application:
The model is being trained based on categorical features x, y, and (x,y).
The categorical features are mapped to binary features by converting each
distinct value in the category enum into a binary feature by itself (i.e
presence of that value in a record implies corresponding feature = 1, else
feature = 0. So, there'd be as many distinct features as enum values) . The
training vector is laid out as
[x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
training data has only one combination (Xk,Yk) and a label appearing in the
record. Thus, the corresponding labeledpoint sparse vector would only have
3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
(though parse) would be nearly 614000. The number of records is about 1.33
million. The records have been coalesced into 20 partitions across two
nodes. The input data has not been cached.
(NOTE: I do realize the records & features may seem large for a two node
setup, but given the memory & cpu, and the fact that I'm willing to give up
some turnaround time, I don't see why tasks should inexplicably fail)
Additional parameters include:
spark.executor.memory = 14G
spark.default.parallelism = 1
spark.cores.max=20
spark.storage.memoryFraction=0.8 //No cache space required
(Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help
either)
The model training was initialized as : new LogisticRegressionWithSGD(1,
maxIterations, 0.0, 0.05)
However, after 4 iterations of gradient descent, the entire execution
appeared to stall inexplicably. The corresponding executor details and
details of the stalled stage (number 14) are as follows:
Metric Min 25th Median 75th Max
Result serialization time 12 ms 13 ms 14 ms 16 ms 18 ms
Duration 4 s 4 s 5 s 5 s 5 s
Time spent fetching task 0 ms 0 ms 0 ms 0 ms 0 ms
results
Scheduler delay 6 s 6 s 6 s 6 s
12 s
Stage Id
14 aggregate at GradientDescent.scala:178
Task Index Task ID Status Locality Level Executor
Launch Time Duration GC Result Ser
Time Errors
Time
0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 1.1 h
5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 2 s 12 ms
6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 14 ms
7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 2 s 12 ms
8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 14 ms
10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 13 ms
12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 18 ms
13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 13 ms
14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 14 ms
15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 12 ms
16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 15 ms
17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 18 ms
18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
2014/06/17 10:32:27 5 s 1 s 16 ms
19 619 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
2014/06/17 10:32:27 4 s 1 s 18 ms
Executor stats:
RDD Blocks Memory Used Disk Used Active Tasks Failed Tasks
Complete Tasks Total Tasks Task Time Shuffle Read Shuffle Write
0 0.0 B / 6.7 GB 0.0 B 2 0
307 309 23.2 m 0.0 B 0.0 B
0 0.0 B / 6.7 GB 0.0 B 3 0
308 311 22.4 m 0.0 B 0.0 B
Executor jmap output:
Server compiler detected.
JVM version is 24.55-b03
using thread-local object allocation.
Parallel GC with 18 thread(s)
Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 10737418240 (10240.0MB)
NewSize = 1310720 (1.25MB)
MaxNewSize = 17592186044415 MB
OldSize = 5439488 (5.1875MB)
NewRatio = 2
SurvivorRatio = 8
PermSize = 21757952 (20.75MB)
MaxPermSize = 134217728 (128.0MB)
G1HeapRegionSize = 0 (0.0MB)
Heap Usage:
PS Young Generation
Eden Space:
capacity = 2783969280 (2655.0MB)
used = 192583816 (183.66223907470703MB)
free = 2591385464 (2471.337760925293MB)
6.917598458557704% used
>From Space:
capacity = 409993216 (391.0MB)
used = 1179808 (1.125152587890625MB)
free = 408813408 (389.8748474121094MB)
0.2877628102022059% used
To Space:
capacity = 385351680 (367.5MB)
used = 0 (0.0MB)
free = 385351680 (367.5MB)
0.0% used
PS Old Generation
capacity = 7158628352 (6827.0MB)
used = 4455093024 (4248.707794189453MB)
free = 2703535328 (2578.292205810547MB)
62.2338918146983% used
PS Perm Generation
capacity = 90701824 (86.5MB)
used = 45348832 (43.248016357421875MB)
free = 45352992 (43.251983642578125MB)
49.99770677158598% used
8432 interned Strings occupying 714672 bytes.
Executor GC log snippet:
168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)]
9302453K->7460857K(9907712K), 0.3193550 secs] [Times: user=5.13 sys=0.39,
real=0.32 secs]
169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] [ParOldGen:
6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K) [PSPermGen:
44248K->44201K(88576K)], 4.5521090 secs] [Times: user=24.22 sys=0.18,
real=4.55 secs]
174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)]
3411653K->1154665K(9534976K), 0.0966280 secs] [Times: user=1.66 sys=0.00,
real=0.09 secs]
I tried to map partitions to cores on the nodes. Increasing the number of
partitions (say to 80 or 100) would result in progress till the 6th
iteration or so, but the next stage would stall as before with apparent
root cause / logs. With increased partitions, the last stage that completed
had the following task times:
Metric Min 25th Median 75th Max
Result serialization time 11 ms 12 ms 13 ms 15 ms 0.4 s
Duration 0.5 s 0.9 s 1 s 3 s 7 s
Time spent fetching 0 ms 0 ms 0 ms 0 ms 0 ms
task results
Scheduler delay 5 s 6 s 6 s 7 s
12 s
My hypothesis is that as the coefficient array becomes less sparse (with
successive iterations), the cost of the aggregate goes up to the point that
it stalls (which I failed to explain). Reducing the batch fraction to a
very low number like 0.01 saw the iterations progress further, but the
model failed to converge in that case after a small number of iterations.
I also tried reducing the number of records by aggregating on (x,y) as the
key (i.e. using aggregations instead of training on every raw record), but
encountered by the following exception:
Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I'd appreciate any insights/comments about what may be causing the
execution to stall.
If logs/tables appear poorly indented in the email, here's a gist with
relevant details: https://gist.github.com/reachbach/a418ab2f01b639b624c1
Thanks,
Bharath