This may sound like an obvious question, but are you sure that the program
is doing any work when you don't have a saveAsTextFile? If there are
transformations but no actions to actually collect the data, there's no
need for Spark to execute the transformations.

As to the question of 'is this taking too long', I can't answer that. But
your code was HTML escaped and therefore difficult to read, perhaps you
should post a link to a Gist.

Joe

On 24 May 2015 at 10:36, allanjie <allanmcgr...@gmail.com> wrote:

> *Problem Description*:
>
> The program running in  stand-alone spark cluster (1 master, 6 workers with
> 8g ram and 2 cores).
> Input: a 468MB file with 133433 records stored in HDFS.
> Output: just 2MB file will stored in HDFS
> The program has two map operations and one reduceByKey operation.
> Finally I save the result to HDFS using "*saveAsTextFile*".
> *Problem*: if I don't add "saveAsTextFile", the program runs very fast(a
> few
> seconds), otherwise extremely slow until about 30 mins.
>
> *My program (is very Simple)*
>         public static void main(String[] args) throws IOException{
>                 /**Parameter Setting***********/
>                  String localPointPath =
> "/home/hduser/skyrock/skyrockImageFeatures.csv";
>                  String remoteFilePath =
> "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
>                  String outputPath =
> "hdfs://HadoopV26Master:9000/user/sparkoutput/";
>                  final int row = 133433;
>                  final int col = 458;
>                  final double dc = Double.valueOf(args[0]);
>
>                 SparkConf conf = new SparkConf().
>                                 setAppName("distance")
>                                 .set("spark.executor.memory",
> "4g").set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>                                 .set("spark.eventLog.enabled", "true");
>                 JavaSparkContext sc = new JavaSparkContext(conf);
>
>                 JavaRDD<String> textFile = sc.textFile(remoteFilePath);
>
>                 //Broadcast variable, the dimension of this double array:
> 133433*458
>                 final Broadcast<double[][]> broadcastPoints =
> sc.broadcast(createBroadcastPoints(localPointPath,row,col));
>                 /**
>                  * Compute the distance in terms of each point on each
> instance.
>                  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
>                  */
>                 JavaPairRDD<Integer,Double> distance =
> textFile.flatMapToPair(new
> PairFlatMapFunction<String, Integer, Double>(){
>                         public Iterable<Tuple2&lt;Integer, Double>>
> call(String v1) throws
> Exception{
>                                 List<String> al =
> Arrays.asList(v1.split(","));
>                                 double[] featureVals = new
> double[al.size()];
>                                 for(int j=0;j<al.size()-1;j++)
>                                         featureVals[j] =
> Double.valueOf(al.get(j+1));
>                                 int jIndex = Integer.valueOf(al.get(0));
>                                 double[][] allPoints =
> broadcastPoints.value();
>                                 double sum = 0;
>                                 List&lt;Tuple2&lt;Integer, Double>> list =
> new
> ArrayList<Tuple2&lt;Integer, Double>>();
>                                 for(int i=0;i<row; i++){
>                                         sum = 0;
>                                         for(int j=0;j&lt;al.size()-1;j++){
>                                                 sum +=
> (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
>                                         }
>                                         list.add(new
> Tuple2&lt;Integer,Double>(jIndex, Math.sqrt(sum) ));
>                                 }
>                                 return list;
>                         }
>                 });
>
>                 //Create zeroOne density
>                 JavaPairRDD<Integer, Integer> densityZeroOne =
> distance.mapValues(new
> Function<Double, Integer>(){
>                         public Integer call(Double v1) throws Exception {
>                                 if(v1<dc)
>                                         return 1;
>                                 else return 0;
>                         }
>
>                 });
> //              //Combine the density
>                 JavaPairRDD&lt;Integer, Integer> counts =
> densityZeroOne.reduceByKey(new
> Function2<Integer, Integer,Integer>() {
>                                 public Integer call(Integer v1, Integer
> v2) throws Exception {
>                                         return v1+v2;
>                                 }
>                         });
>                 counts.*saveAsTextFile*(outputPath+args[1]);
>                 sc.stop();
>         }
>
> *If I comment "saveAsTextFile", log will be:*
> Picked up _JAVA_OPTIONS: -Xmx4g
> 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
> 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser
> 15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to:
> hduser
> 15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(hduser); users with modify permissions: Set(hduser)
> 15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 15/05/24 15:21:31 INFO Remoting: Starting remoting
> 15/05/24 15:21:31 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@HadoopV26Master:57429]
> 15/05/24 15:21:31 INFO util.Utils: Successfully started service
> 'sparkDriver' on port 57429.
> 15/05/24 15:21:31 INFO spark.SparkEnv: Registering MapOutputTracker
> 15/05/24 15:21:31 INFO spark.SparkEnv: Registering BlockManagerMaster
> 15/05/24 15:21:31 INFO storage.DiskBlockManager: Created local directory at
>
> /tmp/spark-6342bde9-feca-4651-8cca-a67541150420/blockmgr-e92d0ae0-ec95-44cb-986a-266a1899202b
> 15/05/24 15:21:31 INFO storage.MemoryStore: MemoryStore started with
> capacity 1966.1 MB
> 15/05/24 15:21:31 INFO spark.HttpFileServer: HTTP File server directory is
>
> /tmp/spark-fea59c9e-1264-45e9-ad31-484d7de83d0a/httpd-c6421767-ffaf-4417-905e-34b3d13a7bf4
> 15/05/24 15:21:31 INFO spark.HttpServer: Starting HTTP Server
> 15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/05/24 15:21:31 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:36956
> 15/05/24 15:21:31 INFO util.Utils: Successfully started service 'HTTP file
> server' on port 36956.
> 15/05/24 15:21:31 INFO spark.SparkEnv: Registering OutputCommitCoordinator
> 15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/05/24 15:21:31 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
> 15/05/24 15:21:31 INFO util.Utils: Successfully started service 'SparkUI'
> on
> port 4040.
> 15/05/24 15:21:31 INFO ui.SparkUI: Started SparkUI at
> http://HadoopV26Master:4040
> 15/05/24 15:21:31 INFO spark.SparkContext: Added JAR
> file:/home/hduser/densityspark.jar at
> http://10.9.0.16:36956/jars/densityspark.jar with timestamp 1432452091753
> 15/05/24 15:21:31 INFO client.AppClient$ClientActor: Connecting to master
> akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master...
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Connected to
> Spark cluster with app ID app-20150524152132-0035
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152132-0035/0 on worker-20150522181416-HadoopV26Slave3-47002
> (HadoopV26Slave3:47002) with 2 cores
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152132-0035/0 on hostPort HadoopV26Slave3:47002 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152132-0035/1 on worker-20150522181417-HadoopV26Slave6-60280
> (HadoopV26Slave6:60280) with 2 cores
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152132-0035/1 on hostPort HadoopV26Slave6:60280 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152132-0035/2 on worker-20150522181417-HadoopV26Slave5-54797
> (HadoopV26Slave5:54797) with 2 cores
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152132-0035/2 on hostPort HadoopV26Slave5:54797 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152132-0035/3 on worker-20150522181416-HadoopV26Slave1-47647
> (HadoopV26Slave1:47647) with 2 cores
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152132-0035/3 on hostPort HadoopV26Slave1:47647 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152132-0035/4 on worker-20150522181417-HadoopV26Slave4-59352
> (HadoopV26Slave4:59352) with 2 cores
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152132-0035/4 on hostPort HadoopV26Slave4:59352 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152132-0035/5 on worker-20150522181417-HadoopV26Slave2-34694
> (HadoopV26Slave2:34694) with 2 cores
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152132-0035/5 on hostPort HadoopV26Slave2:34694 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/0 is now LOADING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/4 is now LOADING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/1 is now LOADING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/3 is now LOADING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/5 is now LOADING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/2 is now LOADING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/0 is now RUNNING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/1 is now RUNNING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/2 is now RUNNING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/3 is now RUNNING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/4 is now RUNNING
> 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152132-0035/5 is now RUNNING
> 15/05/24 15:21:32 INFO netty.NettyBlockTransferService: Server created on
> 47711
> 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Trying to register
> BlockManager
> 15/05/24 15:21:32 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Master:47711 with 1966.1 MB RAM, BlockManagerId(<driver>,
> HadoopV26Master, 47711)
> 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Registered BlockManager
> 15/05/24 15:21:32 INFO scheduler.EventLoggingListener: Logging events to
> file:/tmp/spark-events/app-20150524152132-0035
> 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend:
> SchedulerBackend
> is ready for scheduling beginning after reached
> minRegisteredResourcesRatio:
> 0.0
> 15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(258503) called
> with curMem=0, maxMem=2061647216
> 15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0 stored as
> values in memory (estimated size 252.4 KB, free 1965.9 MB)
> 15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(27227) called
> with curMem=258503, maxMem=2061647216
> 15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
> as bytes in memory (estimated size 26.6 KB, free 1965.9 MB)
> 15/05/24 15:21:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
> in
> memory on HadoopV26Master:47711 (size: 26.6 KB, free: 1966.1 MB)
> 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_0_piece0
> 15/05/24 15:21:32 INFO spark.SparkContext: Created broadcast 0 from
> textFile
> at Clustering.java:67
>
> *If I uncomment "saveAsTextFile" then:*
> 15/05/24 15:23:57 INFO spark.SparkContext: Running Spark version 1.3.1
> 15/05/24 15:23:58 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/05/24 15:23:58 INFO spark.SecurityManager: Changing view acls to: hduser
> 15/05/24 15:23:58 INFO spark.SecurityManager: Changing modify acls to:
> hduser
> 15/05/24 15:23:58 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(hduser); users with modify permissions: Set(hduser)
> 15/05/24 15:23:58 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 15/05/24 15:23:58 INFO Remoting: Starting remoting
> 15/05/24 15:23:58 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@HadoopV26Master:41017]
> 15/05/24 15:23:58 INFO util.Utils: Successfully started service
> 'sparkDriver' on port 41017.
> 15/05/24 15:23:58 INFO spark.SparkEnv: Registering MapOutputTracker
> 15/05/24 15:23:58 INFO spark.SparkEnv: Registering BlockManagerMaster
> 15/05/24 15:23:58 INFO storage.DiskBlockManager: Created local directory at
>
> /tmp/spark-cc73d7c3-4b0e-4a20-bf1c-147a8ee927f7/blockmgr-aa5cb9e3-224c-477e-b0b1-e65ec02e80fe
> 15/05/24 15:23:58 INFO storage.MemoryStore: MemoryStore started with
> capacity 1966.1 MB
> 15/05/24 15:23:58 INFO spark.HttpFileServer: HTTP File server directory is
>
> /tmp/spark-c42a1cc6-f137-4233-96ef-1e0a44e634d1/httpd-114edee3-3404-4425-a644-c14f95a7ee7b
> 15/05/24 15:23:58 INFO spark.HttpServer: Starting HTTP Server
> 15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/05/24 15:23:59 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:38453
> 15/05/24 15:23:59 INFO util.Utils: Successfully started service 'HTTP file
> server' on port 38453.
> 15/05/24 15:23:59 INFO spark.SparkEnv: Registering OutputCommitCoordinator
> 15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 15/05/24 15:23:59 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
> 15/05/24 15:23:59 INFO util.Utils: Successfully started service 'SparkUI'
> on
> port 4040.
> 15/05/24 15:23:59 INFO ui.SparkUI: Started SparkUI at
> http://HadoopV26Master:4040
> 15/05/24 15:23:59 INFO spark.SparkContext: Added JAR
> file:/home/hduser/densitysparksave.jar at
> http://10.9.0.16:38453/jars/densitysparksave.jar with timestamp
> 1432452239254
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Connecting to master
> akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master...
> 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Connected to
> Spark cluster with app ID app-20150524152359-0036
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152359-0036/0 on worker-20150522181416-HadoopV26Slave3-47002
> (HadoopV26Slave3:47002) with 2 cores
> 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152359-0036/0 on hostPort HadoopV26Slave3:47002 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152359-0036/1 on worker-20150522181417-HadoopV26Slave6-60280
> (HadoopV26Slave6:60280) with 2 cores
> 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152359-0036/1 on hostPort HadoopV26Slave6:60280 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152359-0036/2 on worker-20150522181417-HadoopV26Slave5-54797
> (HadoopV26Slave5:54797) with 2 cores
> 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152359-0036/2 on hostPort HadoopV26Slave5:54797 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152359-0036/3 on worker-20150522181416-HadoopV26Slave1-47647
> (HadoopV26Slave1:47647) with 2 cores
> 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152359-0036/3 on hostPort HadoopV26Slave1:47647 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152359-0036/4 on worker-20150522181417-HadoopV26Slave4-59352
> (HadoopV26Slave4:59352) with 2 cores
> 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152359-0036/4 on hostPort HadoopV26Slave4:59352 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added:
> app-20150524152359-0036/5 on worker-20150522181417-HadoopV26Slave2-34694
> (HadoopV26Slave2:34694) with 2 cores
> 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted
> executor
> ID app-20150524152359-0036/5 on hostPort HadoopV26Slave2:34694 with 2
> cores,
> 4.0 GB RAM
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/1 is now LOADING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/2 is now LOADING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/0 is now LOADING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/3 is now LOADING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/5 is now LOADING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/4 is now LOADING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/0 is now RUNNING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/1 is now RUNNING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/2 is now RUNNING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/3 is now RUNNING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/4 is now RUNNING
> 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated:
> app-20150524152359-0036/5 is now RUNNING
> 15/05/24 15:23:59 INFO netty.NettyBlockTransferService: Server created on
> 46919
> 15/05/24 15:23:59 INFO storage.BlockManagerMaster: Trying to register
> BlockManager
> 15/05/24 15:23:59 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Master:46919 with 1966.1 MB RAM, BlockManagerId(<driver>,
> HadoopV26Master, 46919)
> 15/05/24 15:23:59 INFO storage.BlockManagerMaster: Registered BlockManager
> 15/05/24 15:24:00 INFO scheduler.EventLoggingListener: Logging events to
> file:/tmp/spark-events/app-20150524152359-0036
> 15/05/24 15:24:00 INFO cluster.SparkDeploySchedulerBackend:
> SchedulerBackend
> is ready for scheduling beginning after reached
> minRegisteredResourcesRatio:
> 0.0
> 15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(258503) called
> with curMem=0, maxMem=2061647216
> 15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0 stored as
> values in memory (estimated size 252.4 KB, free 1965.9 MB)
> 15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(27227) called
> with curMem=258503, maxMem=2061647216
> 15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
> as bytes in memory (estimated size 26.6 KB, free 1965.9 MB)
> 15/05/24 15:24:00 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
> in
> memory on HadoopV26Master:46919 (size: 26.6 KB, free: 1966.1 MB)
> 15/05/24 15:24:00 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_0_piece0
> 15/05/24 15:24:00 INFO spark.SparkContext: Created broadcast 0 from
> textFile
> at Clustering.java:67
> 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered
> executor:
> Actor[akka.tcp://sparkExecutor@HadoopV26Slave5
> :43757/user/Executor#-184395839]
> with ID 2
> 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered
> executor:
> Actor[akka.tcp://sparkExecutor@HadoopV26Slave4
> :35123/user/Executor#1588464525]
> with ID 4
> 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered
> executor:
> Actor[akka.tcp://sparkExecutor@HadoopV26Slave2
> :33439/user/Executor#529853915]
> with ID 5
> 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered
> executor:
> Actor[akka.tcp://sparkExecutor@HadoopV26Slave6
> :37211/user/Executor#509959098]
> with ID 1
> 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered
> executor:
> Actor[akka.tcp://sparkExecutor@HadoopV26Slave3
> :52152/user/Executor#-1731088969]
> with ID 0
> 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered
> executor:
> Actor[akka.tcp://sparkExecutor@HadoopV26Slave1
> :55666/user/Executor#-635759065]
> with ID 3
> 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Slave5:45835 with 2.1 GB RAM, BlockManagerId(2,
> HadoopV26Slave5, 45835)
> 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Slave2:44597 with 2.1 GB RAM, BlockManagerId(5,
> HadoopV26Slave2, 44597)
> 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Slave4:44317 with 2.1 GB RAM, BlockManagerId(4,
> HadoopV26Slave4, 44317)
> 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Slave6:52445 with 2.1 GB RAM, BlockManagerId(1,
> HadoopV26Slave6, 52445)
> 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Slave3:38931 with 2.1 GB RAM, BlockManagerId(0,
> HadoopV26Slave3, 38931)
> 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block
> manager HadoopV26Slave1:44960 with 2.1 GB RAM, BlockManagerId(3,
> HadoopV26Slave1, 44960)
> 15/05/24 15:24:10 INFO mapred.FileInputFormat: Total input paths to process
> : 1
> 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.tip.id is
> deprecated. Instead, use mapreduce.task.id
> 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.id is
> deprecated. Instead, use mapreduce.task.attempt.id
> 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.is.map is
> deprecated. Instead, use mapreduce.task.ismap
> 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.partition is
> deprecated. Instead, use mapreduce.task.partition
> 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.job.id is
> deprecated. Instead, use mapreduce.job.id
> 15/05/24 15:24:11 INFO spark.SparkContext: Starting job: saveAsTextFile at
> Clustering.java:117
> 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Registering RDD 3 (mapValues
> at Clustering.java:100)
> 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
> Clustering.java:117) with 4 output partitions (allowLocal=false)
> 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Final stage: Stage
> 1(saveAsTextFile at Clustering.java:117)
> 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Parents of final stage:
> List(Stage 0)
> 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Missing parents: List(Stage
> 0)
> 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Submitting Stage 0
> (MapPartitionsRDD[3] at mapValues at Clustering.java:100), which has no
> missing parents
> 15/05/24 15:24:12 INFO storage.MemoryStore: ensureFreeSpace(490237112)
> called with curMem=285730, maxMem=2061647216
> 15/05/24 15:24:12 INFO storage.MemoryStore: Block broadcast_1 stored as
> values in memory (estimated size 467.5 MB, free 1498.3 MB)
> 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called
> with curMem=490522842, maxMem=2061647216
> 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece0 stored
> as bytes in memory (estimated size 4.0 MB, free 1494.3 MB)
> 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
> in
> memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1962.1 MB)
> 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_1_piece0
> *(omits hundreds of lines here)*
> 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called
> with curMem=968673498, maxMem=2061647216
> 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece114
> stored as bytes in memory (estimated size 4.0 MB, free 1038.3 MB)
> 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece114
> in memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1506.1 MB)
> 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_1_piece114
> 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(2116571) called
> with curMem=972867802, maxMem=2061647216
> 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece115
> stored as bytes in memory (estimated size 2.0 MB, free 1036.3 MB)
> 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece115
> in memory on HadoopV26Master:46919 (size: 2.0 MB, free: 1504.1 MB)
> 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_1_piece115
> 15/05/24 15:24:13 INFO spark.SparkContext: Created broadcast 1 from
> broadcast at DAGScheduler.scala:839
> 15/05/24 15:24:13 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
> from Stage 0 (MapPartitionsRDD[3] at mapValues at Clustering.java:100)
> 15/05/24 15:24:13 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
> with
> 4 tasks
> 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
> 0.0 (TID 0, HadoopV26Slave1, NODE_LOCAL, 1383 bytes)
> 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
> 0.0 (TID 1, HadoopV26Slave2, NODE_LOCAL, 1383 bytes)
> 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
> 0.0 (TID 2, HadoopV26Slave3, NODE_LOCAL, 1383 bytes)
> 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
> 0.0 (TID 3, HadoopV26Slave4, NODE_LOCAL, 1383 bytes)
> 15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece68
> in memory on HadoopV26Slave4:44317 (size: 4.0 MB, free: 2.1 GB)
> 15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece102
> in memory on HadoopV26Slave2:44597 (size: 4.0 MB, free: 2.1 GB)
> *(omits hundreds of lines here)*
> 15/05/24 15:24:23 INFO storage.BlockManagerInfo: Added broadcast_1_piece35
> in memory on HadoopV26Slave1:44960 (size: 4.0 MB, free: 1666.0 MB)
> 15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
> in
> memory on HadoopV26Slave3:38931 (size: 26.6 KB, free: 1658.0 MB)
> 15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
> in
> memory on HadoopV26Slave1:44960 (size: 26.6 KB, free: 1658.0 MB)
> *Then the program stuck at here for many mins*
>
> At this time, I check the web UI master:4040 and see the second map is
> still
> running:
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/jobstage.png
> >
> And then I click into mapValue I saw this:
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapValue.png
> >
> But why that is so slow? And how's that related to "saveAsTextFile".
>
> BTW, in the mapValue stage, I saw input size/records increased very slowly,
> does it mean read very slow. MapValue is the second stage.  And why it
> didn't show the reduce stage?
>
> *Then after about 20 mins, I get the following logs and can get the
> result:*
> 15/05/24 16:12:22 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
> 0.0 (TID 2) in 2888967 ms on HadoopV26Slave3 (1/4)
> 15/05/24 16:24:27 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
> 0.0 (TID 3) in 3614079 ms on HadoopV26Slave4 (2/4)
> 15/05/24 16:25:53 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
> 0.0 (TID 1) in 3700117 ms on HadoopV26Slave2 (3/4)
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Stage 0 (mapValues at
> Clustering.java:100) finished in 3801.290 s
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: looking for newly runnable
> stages
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: running: Set()
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: waiting: Set(Stage 1)
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: failed: Set()
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Missing parents for Stage 1:
> List()
> 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
> 0.0 (TID 0) in 3801284 ms on HadoopV26Slave1 (4/4)
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting Stage 1
> (MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117), which is
> now
> runnable
> 15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
> whose tasks have all completed, from pool
> 15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(128128) called
> with curMem=974984373, maxMem=2061647216
> 15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2 stored as
> values in memory (estimated size 125.1 KB, free 1036.2 MB)
> 15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(58374) called
> with curMem=975112501, maxMem=2061647216
> 15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
> as bytes in memory (estimated size 57.0 KB, free 1036.1 MB)
> 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Master:46919 (size: 57.0 KB, free: 1504.0 MB)
> 15/05/24 16:27:35 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 15/05/24 16:27:35 INFO spark.SparkContext: Created broadcast 2 from
> broadcast at DAGScheduler.scala:839
> 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
> from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117)
> 15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
> with
> 4 tasks
> 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
> 1.0 (TID 4, HadoopV26Slave5, PROCESS_LOCAL, 1114 bytes)
> 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
> 1.0 (TID 5, HadoopV26Slave3, PROCESS_LOCAL, 1114 bytes)
> 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
> 1.0 (TID 6, HadoopV26Slave2, PROCESS_LOCAL, 1114 bytes)
> 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
> 1.0 (TID 7, HadoopV26Slave6, PROCESS_LOCAL, 1114 bytes)
> 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Slave2:44597 (size: 57.0 KB, free: 1657.9 MB)
> 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Slave3:38931 (size: 57.0 KB, free: 1657.9 MB)
> 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 0 to sparkExecutor@HadoopV26Slave2:33439
> 15/05/24 16:27:35 INFO spark.MapOutputTrackerMaster: Size of output
> statuses
> for shuffle 0 is 188 bytes
> 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 0 to sparkExecutor@HadoopV26Slave3:52152
> 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Slave6:52445 (size: 57.0 KB, free: 2.1 GB)
> 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in
> memory on HadoopV26Slave5:45835 (size: 57.0 KB, free: 2.1 GB)
> 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 0 to sparkExecutor@HadoopV26Slave6:37211
> 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 0 to sparkExecutor@HadoopV26Slave5:43757
> 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on
> HadoopV26Slave3:38931 (size: 1733.1 KB, free: 1656.2 MB)
> 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_2 in memory on
> HadoopV26Slave2:44597 (size: 1733.1 KB, free: 1656.2 MB)
> 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_3 in memory on
> HadoopV26Slave6:52445 (size: 1717.4 KB, free: 2.1 GB)
> 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
> 1.0 (TID 5) in 1757 ms on HadoopV26Slave3 (1/4)
> 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
> 1.0 (TID 6) in 1776 ms on HadoopV26Slave2 (2/4)
> 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_0 in memory on
> HadoopV26Slave5:45835 (size: 1733.1 KB, free: 2.1 GB)
> 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
> 1.0 (TID 7) in 3153 ms on HadoopV26Slave6 (3/4)
> 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at
> Clustering.java:117) finished in 3.258 s
> 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
> 1.0 (TID 4) in 3256 ms on HadoopV26Slave5 (4/4)
> 15/05/24 16:27:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0,
> whose tasks have all completed, from pool
> 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Job 0 finished:
> saveAsTextFile at Clustering.java:117, took 3807.229501 s
>
> although I can obtain the result ,but it's too slow, right?
> The followings are also the final result info.
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/result.png
> >
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapvalueres.png
> >
>
> PS: if I reduce the size the input to just 10 records, it performs very
> fast. But it doesn't make any sense for just 10 records.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-dramatically-slow-when-I-add-saveAsTextFile-tp23003.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to