This may sound like an obvious question, but are you sure that the program is doing any work when you don't have a saveAsTextFile? If there are transformations but no actions to actually collect the data, there's no need for Spark to execute the transformations.
As to the question of 'is this taking too long', I can't answer that. But your code was HTML escaped and therefore difficult to read, perhaps you should post a link to a Gist. Joe On 24 May 2015 at 10:36, allanjie <allanmcgr...@gmail.com> wrote: > *Problem Description*: > > The program running in stand-alone spark cluster (1 master, 6 workers with > 8g ram and 2 cores). > Input: a 468MB file with 133433 records stored in HDFS. > Output: just 2MB file will stored in HDFS > The program has two map operations and one reduceByKey operation. > Finally I save the result to HDFS using "*saveAsTextFile*". > *Problem*: if I don't add "saveAsTextFile", the program runs very fast(a > few > seconds), otherwise extremely slow until about 30 mins. > > *My program (is very Simple)* > public static void main(String[] args) throws IOException{ > /**Parameter Setting***********/ > String localPointPath = > "/home/hduser/skyrock/skyrockImageFeatures.csv"; > String remoteFilePath = > "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv"; > String outputPath = > "hdfs://HadoopV26Master:9000/user/sparkoutput/"; > final int row = 133433; > final int col = 458; > final double dc = Double.valueOf(args[0]); > > SparkConf conf = new SparkConf(). > setAppName("distance") > .set("spark.executor.memory", > "4g").set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.eventLog.enabled", "true"); > JavaSparkContext sc = new JavaSparkContext(conf); > > JavaRDD<String> textFile = sc.textFile(remoteFilePath); > > //Broadcast variable, the dimension of this double array: > 133433*458 > final Broadcast<double[][]> broadcastPoints = > sc.broadcast(createBroadcastPoints(localPointPath,row,col)); > /** > * Compute the distance in terms of each point on each > instance. > * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 > */ > JavaPairRDD<Integer,Double> distance = > textFile.flatMapToPair(new > PairFlatMapFunction<String, Integer, Double>(){ > public Iterable<Tuple2<Integer, Double>> > call(String v1) throws > Exception{ > List<String> al = > Arrays.asList(v1.split(",")); > double[] featureVals = new > double[al.size()]; > for(int j=0;j<al.size()-1;j++) > featureVals[j] = > Double.valueOf(al.get(j+1)); > int jIndex = Integer.valueOf(al.get(0)); > double[][] allPoints = > broadcastPoints.value(); > double sum = 0; > List<Tuple2<Integer, Double>> list = > new > ArrayList<Tuple2<Integer, Double>>(); > for(int i=0;i<row; i++){ > sum = 0; > for(int j=0;j<al.size()-1;j++){ > sum += > (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); > } > list.add(new > Tuple2<Integer,Double>(jIndex, Math.sqrt(sum) )); > } > return list; > } > }); > > //Create zeroOne density > JavaPairRDD<Integer, Integer> densityZeroOne = > distance.mapValues(new > Function<Double, Integer>(){ > public Integer call(Double v1) throws Exception { > if(v1<dc) > return 1; > else return 0; > } > > }); > // //Combine the density > JavaPairRDD<Integer, Integer> counts = > densityZeroOne.reduceByKey(new > Function2<Integer, Integer,Integer>() { > public Integer call(Integer v1, Integer > v2) throws Exception { > return v1+v2; > } > }); > counts.*saveAsTextFile*(outputPath+args[1]); > sc.stop(); > } > > *If I comment "saveAsTextFile", log will be:* > Picked up _JAVA_OPTIONS: -Xmx4g > 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1 > 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser > 15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to: > hduser > 15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager: > authentication disabled; ui acls disabled; users with view permissions: > Set(hduser); users with modify permissions: Set(hduser) > 15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started > 15/05/24 15:21:31 INFO Remoting: Starting remoting > 15/05/24 15:21:31 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkDriver@HadoopV26Master:57429] > 15/05/24 15:21:31 INFO util.Utils: Successfully started service > 'sparkDriver' on port 57429. > 15/05/24 15:21:31 INFO spark.SparkEnv: Registering MapOutputTracker > 15/05/24 15:21:31 INFO spark.SparkEnv: Registering BlockManagerMaster > 15/05/24 15:21:31 INFO storage.DiskBlockManager: Created local directory at > > /tmp/spark-6342bde9-feca-4651-8cca-a67541150420/blockmgr-e92d0ae0-ec95-44cb-986a-266a1899202b > 15/05/24 15:21:31 INFO storage.MemoryStore: MemoryStore started with > capacity 1966.1 MB > 15/05/24 15:21:31 INFO spark.HttpFileServer: HTTP File server directory is > > /tmp/spark-fea59c9e-1264-45e9-ad31-484d7de83d0a/httpd-c6421767-ffaf-4417-905e-34b3d13a7bf4 > 15/05/24 15:21:31 INFO spark.HttpServer: Starting HTTP Server > 15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT > 15/05/24 15:21:31 INFO server.AbstractConnector: Started > SocketConnector@0.0.0.0:36956 > 15/05/24 15:21:31 INFO util.Utils: Successfully started service 'HTTP file > server' on port 36956. > 15/05/24 15:21:31 INFO spark.SparkEnv: Registering OutputCommitCoordinator > 15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT > 15/05/24 15:21:31 INFO server.AbstractConnector: Started > SelectChannelConnector@0.0.0.0:4040 > 15/05/24 15:21:31 INFO util.Utils: Successfully started service 'SparkUI' > on > port 4040. > 15/05/24 15:21:31 INFO ui.SparkUI: Started SparkUI at > http://HadoopV26Master:4040 > 15/05/24 15:21:31 INFO spark.SparkContext: Added JAR > file:/home/hduser/densityspark.jar at > http://10.9.0.16:36956/jars/densityspark.jar with timestamp 1432452091753 > 15/05/24 15:21:31 INFO client.AppClient$ClientActor: Connecting to master > akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master... > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Connected to > Spark cluster with app ID app-20150524152132-0035 > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: > app-20150524152132-0035/0 on worker-20150522181416-HadoopV26Slave3-47002 > (HadoopV26Slave3:47002) with 2 cores > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152132-0035/0 on hostPort HadoopV26Slave3:47002 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: > app-20150524152132-0035/1 on worker-20150522181417-HadoopV26Slave6-60280 > (HadoopV26Slave6:60280) with 2 cores > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152132-0035/1 on hostPort HadoopV26Slave6:60280 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: > app-20150524152132-0035/2 on worker-20150522181417-HadoopV26Slave5-54797 > (HadoopV26Slave5:54797) with 2 cores > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152132-0035/2 on hostPort HadoopV26Slave5:54797 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: > app-20150524152132-0035/3 on worker-20150522181416-HadoopV26Slave1-47647 > (HadoopV26Slave1:47647) with 2 cores > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152132-0035/3 on hostPort HadoopV26Slave1:47647 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: > app-20150524152132-0035/4 on worker-20150522181417-HadoopV26Slave4-59352 > (HadoopV26Slave4:59352) with 2 cores > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152132-0035/4 on hostPort HadoopV26Slave4:59352 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor added: > app-20150524152132-0035/5 on worker-20150522181417-HadoopV26Slave2-34694 > (HadoopV26Slave2:34694) with 2 cores > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152132-0035/5 on hostPort HadoopV26Slave2:34694 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/0 is now LOADING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/4 is now LOADING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/1 is now LOADING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/3 is now LOADING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/5 is now LOADING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/2 is now LOADING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/0 is now RUNNING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/1 is now RUNNING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/2 is now RUNNING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/3 is now RUNNING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/4 is now RUNNING > 15/05/24 15:21:32 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152132-0035/5 is now RUNNING > 15/05/24 15:21:32 INFO netty.NettyBlockTransferService: Server created on > 47711 > 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Trying to register > BlockManager > 15/05/24 15:21:32 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Master:47711 with 1966.1 MB RAM, BlockManagerId(<driver>, > HadoopV26Master, 47711) > 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Registered BlockManager > 15/05/24 15:21:32 INFO scheduler.EventLoggingListener: Logging events to > file:/tmp/spark-events/app-20150524152132-0035 > 15/05/24 15:21:32 INFO cluster.SparkDeploySchedulerBackend: > SchedulerBackend > is ready for scheduling beginning after reached > minRegisteredResourcesRatio: > 0.0 > 15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(258503) called > with curMem=0, maxMem=2061647216 > 15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0 stored as > values in memory (estimated size 252.4 KB, free 1965.9 MB) > 15/05/24 15:21:32 INFO storage.MemoryStore: ensureFreeSpace(27227) called > with curMem=258503, maxMem=2061647216 > 15/05/24 15:21:32 INFO storage.MemoryStore: Block broadcast_0_piece0 stored > as bytes in memory (estimated size 26.6 KB, free 1965.9 MB) > 15/05/24 15:21:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 > in > memory on HadoopV26Master:47711 (size: 26.6 KB, free: 1966.1 MB) > 15/05/24 15:21:32 INFO storage.BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/05/24 15:21:32 INFO spark.SparkContext: Created broadcast 0 from > textFile > at Clustering.java:67 > > *If I uncomment "saveAsTextFile" then:* > 15/05/24 15:23:57 INFO spark.SparkContext: Running Spark version 1.3.1 > 15/05/24 15:23:58 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/05/24 15:23:58 INFO spark.SecurityManager: Changing view acls to: hduser > 15/05/24 15:23:58 INFO spark.SecurityManager: Changing modify acls to: > hduser > 15/05/24 15:23:58 INFO spark.SecurityManager: SecurityManager: > authentication disabled; ui acls disabled; users with view permissions: > Set(hduser); users with modify permissions: Set(hduser) > 15/05/24 15:23:58 INFO slf4j.Slf4jLogger: Slf4jLogger started > 15/05/24 15:23:58 INFO Remoting: Starting remoting > 15/05/24 15:23:58 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkDriver@HadoopV26Master:41017] > 15/05/24 15:23:58 INFO util.Utils: Successfully started service > 'sparkDriver' on port 41017. > 15/05/24 15:23:58 INFO spark.SparkEnv: Registering MapOutputTracker > 15/05/24 15:23:58 INFO spark.SparkEnv: Registering BlockManagerMaster > 15/05/24 15:23:58 INFO storage.DiskBlockManager: Created local directory at > > /tmp/spark-cc73d7c3-4b0e-4a20-bf1c-147a8ee927f7/blockmgr-aa5cb9e3-224c-477e-b0b1-e65ec02e80fe > 15/05/24 15:23:58 INFO storage.MemoryStore: MemoryStore started with > capacity 1966.1 MB > 15/05/24 15:23:58 INFO spark.HttpFileServer: HTTP File server directory is > > /tmp/spark-c42a1cc6-f137-4233-96ef-1e0a44e634d1/httpd-114edee3-3404-4425-a644-c14f95a7ee7b > 15/05/24 15:23:58 INFO spark.HttpServer: Starting HTTP Server > 15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT > 15/05/24 15:23:59 INFO server.AbstractConnector: Started > SocketConnector@0.0.0.0:38453 > 15/05/24 15:23:59 INFO util.Utils: Successfully started service 'HTTP file > server' on port 38453. > 15/05/24 15:23:59 INFO spark.SparkEnv: Registering OutputCommitCoordinator > 15/05/24 15:23:59 INFO server.Server: jetty-8.y.z-SNAPSHOT > 15/05/24 15:23:59 INFO server.AbstractConnector: Started > SelectChannelConnector@0.0.0.0:4040 > 15/05/24 15:23:59 INFO util.Utils: Successfully started service 'SparkUI' > on > port 4040. > 15/05/24 15:23:59 INFO ui.SparkUI: Started SparkUI at > http://HadoopV26Master:4040 > 15/05/24 15:23:59 INFO spark.SparkContext: Added JAR > file:/home/hduser/densitysparksave.jar at > http://10.9.0.16:38453/jars/densitysparksave.jar with timestamp > 1432452239254 > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Connecting to master > akka.tcp://sparkMaster@HadoopV26Master:7077/user/Master... > 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Connected to > Spark cluster with app ID app-20150524152359-0036 > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: > app-20150524152359-0036/0 on worker-20150522181416-HadoopV26Slave3-47002 > (HadoopV26Slave3:47002) with 2 cores > 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152359-0036/0 on hostPort HadoopV26Slave3:47002 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: > app-20150524152359-0036/1 on worker-20150522181417-HadoopV26Slave6-60280 > (HadoopV26Slave6:60280) with 2 cores > 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152359-0036/1 on hostPort HadoopV26Slave6:60280 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: > app-20150524152359-0036/2 on worker-20150522181417-HadoopV26Slave5-54797 > (HadoopV26Slave5:54797) with 2 cores > 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152359-0036/2 on hostPort HadoopV26Slave5:54797 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: > app-20150524152359-0036/3 on worker-20150522181416-HadoopV26Slave1-47647 > (HadoopV26Slave1:47647) with 2 cores > 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152359-0036/3 on hostPort HadoopV26Slave1:47647 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: > app-20150524152359-0036/4 on worker-20150522181417-HadoopV26Slave4-59352 > (HadoopV26Slave4:59352) with 2 cores > 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152359-0036/4 on hostPort HadoopV26Slave4:59352 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor added: > app-20150524152359-0036/5 on worker-20150522181417-HadoopV26Slave2-34694 > (HadoopV26Slave2:34694) with 2 cores > 15/05/24 15:23:59 INFO cluster.SparkDeploySchedulerBackend: Granted > executor > ID app-20150524152359-0036/5 on hostPort HadoopV26Slave2:34694 with 2 > cores, > 4.0 GB RAM > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/1 is now LOADING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/2 is now LOADING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/0 is now LOADING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/3 is now LOADING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/5 is now LOADING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/4 is now LOADING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/0 is now RUNNING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/1 is now RUNNING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/2 is now RUNNING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/3 is now RUNNING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/4 is now RUNNING > 15/05/24 15:23:59 INFO client.AppClient$ClientActor: Executor updated: > app-20150524152359-0036/5 is now RUNNING > 15/05/24 15:23:59 INFO netty.NettyBlockTransferService: Server created on > 46919 > 15/05/24 15:23:59 INFO storage.BlockManagerMaster: Trying to register > BlockManager > 15/05/24 15:23:59 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Master:46919 with 1966.1 MB RAM, BlockManagerId(<driver>, > HadoopV26Master, 46919) > 15/05/24 15:23:59 INFO storage.BlockManagerMaster: Registered BlockManager > 15/05/24 15:24:00 INFO scheduler.EventLoggingListener: Logging events to > file:/tmp/spark-events/app-20150524152359-0036 > 15/05/24 15:24:00 INFO cluster.SparkDeploySchedulerBackend: > SchedulerBackend > is ready for scheduling beginning after reached > minRegisteredResourcesRatio: > 0.0 > 15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(258503) called > with curMem=0, maxMem=2061647216 > 15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0 stored as > values in memory (estimated size 252.4 KB, free 1965.9 MB) > 15/05/24 15:24:00 INFO storage.MemoryStore: ensureFreeSpace(27227) called > with curMem=258503, maxMem=2061647216 > 15/05/24 15:24:00 INFO storage.MemoryStore: Block broadcast_0_piece0 stored > as bytes in memory (estimated size 26.6 KB, free 1965.9 MB) > 15/05/24 15:24:00 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 > in > memory on HadoopV26Master:46919 (size: 26.6 KB, free: 1966.1 MB) > 15/05/24 15:24:00 INFO storage.BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/05/24 15:24:00 INFO spark.SparkContext: Created broadcast 0 from > textFile > at Clustering.java:67 > 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered > executor: > Actor[akka.tcp://sparkExecutor@HadoopV26Slave5 > :43757/user/Executor#-184395839] > with ID 2 > 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered > executor: > Actor[akka.tcp://sparkExecutor@HadoopV26Slave4 > :35123/user/Executor#1588464525] > with ID 4 > 15/05/24 15:24:01 INFO cluster.SparkDeploySchedulerBackend: Registered > executor: > Actor[akka.tcp://sparkExecutor@HadoopV26Slave2 > :33439/user/Executor#529853915] > with ID 5 > 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered > executor: > Actor[akka.tcp://sparkExecutor@HadoopV26Slave6 > :37211/user/Executor#509959098] > with ID 1 > 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered > executor: > Actor[akka.tcp://sparkExecutor@HadoopV26Slave3 > :52152/user/Executor#-1731088969] > with ID 0 > 15/05/24 15:24:02 INFO cluster.SparkDeploySchedulerBackend: Registered > executor: > Actor[akka.tcp://sparkExecutor@HadoopV26Slave1 > :55666/user/Executor#-635759065] > with ID 3 > 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Slave5:45835 with 2.1 GB RAM, BlockManagerId(2, > HadoopV26Slave5, 45835) > 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Slave2:44597 with 2.1 GB RAM, BlockManagerId(5, > HadoopV26Slave2, 44597) > 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Slave4:44317 with 2.1 GB RAM, BlockManagerId(4, > HadoopV26Slave4, 44317) > 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Slave6:52445 with 2.1 GB RAM, BlockManagerId(1, > HadoopV26Slave6, 52445) > 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Slave3:38931 with 2.1 GB RAM, BlockManagerId(0, > HadoopV26Slave3, 38931) > 15/05/24 15:24:02 INFO storage.BlockManagerMasterActor: Registering block > manager HadoopV26Slave1:44960 with 2.1 GB RAM, BlockManagerId(3, > HadoopV26Slave1, 44960) > 15/05/24 15:24:10 INFO mapred.FileInputFormat: Total input paths to process > : 1 > 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.tip.id is > deprecated. Instead, use mapreduce.task.id > 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.id is > deprecated. Instead, use mapreduce.task.attempt.id > 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.is.map is > deprecated. Instead, use mapreduce.task.ismap > 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.task.partition is > deprecated. Instead, use mapreduce.task.partition > 15/05/24 15:24:10 INFO Configuration.deprecation: mapred.job.id is > deprecated. Instead, use mapreduce.job.id > 15/05/24 15:24:11 INFO spark.SparkContext: Starting job: saveAsTextFile at > Clustering.java:117 > 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Registering RDD 3 (mapValues > at Clustering.java:100) > 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at > Clustering.java:117) with 4 output partitions (allowLocal=false) > 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Final stage: Stage > 1(saveAsTextFile at Clustering.java:117) > 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Parents of final stage: > List(Stage 0) > 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Missing parents: List(Stage > 0) > 15/05/24 15:24:11 INFO scheduler.DAGScheduler: Submitting Stage 0 > (MapPartitionsRDD[3] at mapValues at Clustering.java:100), which has no > missing parents > 15/05/24 15:24:12 INFO storage.MemoryStore: ensureFreeSpace(490237112) > called with curMem=285730, maxMem=2061647216 > 15/05/24 15:24:12 INFO storage.MemoryStore: Block broadcast_1 stored as > values in memory (estimated size 467.5 MB, free 1498.3 MB) > 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called > with curMem=490522842, maxMem=2061647216 > 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece0 stored > as bytes in memory (estimated size 4.0 MB, free 1494.3 MB) > 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 > in > memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1962.1 MB) > 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block > broadcast_1_piece0 > *(omits hundreds of lines here)* > 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(4194304) called > with curMem=968673498, maxMem=2061647216 > 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece114 > stored as bytes in memory (estimated size 4.0 MB, free 1038.3 MB) > 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece114 > in memory on HadoopV26Master:46919 (size: 4.0 MB, free: 1506.1 MB) > 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block > broadcast_1_piece114 > 15/05/24 15:24:13 INFO storage.MemoryStore: ensureFreeSpace(2116571) called > with curMem=972867802, maxMem=2061647216 > 15/05/24 15:24:13 INFO storage.MemoryStore: Block broadcast_1_piece115 > stored as bytes in memory (estimated size 2.0 MB, free 1036.3 MB) > 15/05/24 15:24:13 INFO storage.BlockManagerInfo: Added broadcast_1_piece115 > in memory on HadoopV26Master:46919 (size: 2.0 MB, free: 1504.1 MB) > 15/05/24 15:24:13 INFO storage.BlockManagerMaster: Updated info of block > broadcast_1_piece115 > 15/05/24 15:24:13 INFO spark.SparkContext: Created broadcast 1 from > broadcast at DAGScheduler.scala:839 > 15/05/24 15:24:13 INFO scheduler.DAGScheduler: Submitting 4 missing tasks > from Stage 0 (MapPartitionsRDD[3] at mapValues at Clustering.java:100) > 15/05/24 15:24:13 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 > with > 4 tasks > 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 0.0 (TID 0, HadoopV26Slave1, NODE_LOCAL, 1383 bytes) > 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 2.0 in stage > 0.0 (TID 1, HadoopV26Slave2, NODE_LOCAL, 1383 bytes) > 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 3.0 in stage > 0.0 (TID 2, HadoopV26Slave3, NODE_LOCAL, 1383 bytes) > 15/05/24 15:24:13 INFO scheduler.TaskSetManager: Starting task 1.0 in stage > 0.0 (TID 3, HadoopV26Slave4, NODE_LOCAL, 1383 bytes) > 15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece68 > in memory on HadoopV26Slave4:44317 (size: 4.0 MB, free: 2.1 GB) > 15/05/24 15:24:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece102 > in memory on HadoopV26Slave2:44597 (size: 4.0 MB, free: 2.1 GB) > *(omits hundreds of lines here)* > 15/05/24 15:24:23 INFO storage.BlockManagerInfo: Added broadcast_1_piece35 > in memory on HadoopV26Slave1:44960 (size: 4.0 MB, free: 1666.0 MB) > 15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 > in > memory on HadoopV26Slave3:38931 (size: 26.6 KB, free: 1658.0 MB) > 15/05/24 15:24:24 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 > in > memory on HadoopV26Slave1:44960 (size: 26.6 KB, free: 1658.0 MB) > *Then the program stuck at here for many mins* > > At this time, I check the web UI master:4040 and see the second map is > still > running: > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/jobstage.png > > > And then I click into mapValue I saw this: > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapValue.png > > > But why that is so slow? And how's that related to "saveAsTextFile". > > BTW, in the mapValue stage, I saw input size/records increased very slowly, > does it mean read very slow. MapValue is the second stage. And why it > didn't show the reduce stage? > > *Then after about 20 mins, I get the following logs and can get the > result:* > 15/05/24 16:12:22 INFO scheduler.TaskSetManager: Finished task 3.0 in stage > 0.0 (TID 2) in 2888967 ms on HadoopV26Slave3 (1/4) > 15/05/24 16:24:27 INFO scheduler.TaskSetManager: Finished task 1.0 in stage > 0.0 (TID 3) in 3614079 ms on HadoopV26Slave4 (2/4) > 15/05/24 16:25:53 INFO scheduler.TaskSetManager: Finished task 2.0 in stage > 0.0 (TID 1) in 3700117 ms on HadoopV26Slave2 (3/4) > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Stage 0 (mapValues at > Clustering.java:100) finished in 3801.290 s > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: looking for newly runnable > stages > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: running: Set() > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: waiting: Set(Stage 1) > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: failed: Set() > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Missing parents for Stage 1: > List() > 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 0.0 (TID 0) in 3801284 ms on HadoopV26Slave1 (4/4) > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting Stage 1 > (MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117), which is > now > runnable > 15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, > whose tasks have all completed, from pool > 15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(128128) called > with curMem=974984373, maxMem=2061647216 > 15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2 stored as > values in memory (estimated size 125.1 KB, free 1036.2 MB) > 15/05/24 16:27:35 INFO storage.MemoryStore: ensureFreeSpace(58374) called > with curMem=975112501, maxMem=2061647216 > 15/05/24 16:27:35 INFO storage.MemoryStore: Block broadcast_2_piece0 stored > as bytes in memory (estimated size 57.0 KB, free 1036.1 MB) > 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 > in > memory on HadoopV26Master:46919 (size: 57.0 KB, free: 1504.0 MB) > 15/05/24 16:27:35 INFO storage.BlockManagerMaster: Updated info of block > broadcast_2_piece0 > 15/05/24 16:27:35 INFO spark.SparkContext: Created broadcast 2 from > broadcast at DAGScheduler.scala:839 > 15/05/24 16:27:35 INFO scheduler.DAGScheduler: Submitting 4 missing tasks > from Stage 1 (MapPartitionsRDD[5] at saveAsTextFile at Clustering.java:117) > 15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 > with > 4 tasks > 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 1.0 (TID 4, HadoopV26Slave5, PROCESS_LOCAL, 1114 bytes) > 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 1.0 in stage > 1.0 (TID 5, HadoopV26Slave3, PROCESS_LOCAL, 1114 bytes) > 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 2.0 in stage > 1.0 (TID 6, HadoopV26Slave2, PROCESS_LOCAL, 1114 bytes) > 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 3.0 in stage > 1.0 (TID 7, HadoopV26Slave6, PROCESS_LOCAL, 1114 bytes) > 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 > in > memory on HadoopV26Slave2:44597 (size: 57.0 KB, free: 1657.9 MB) > 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 > in > memory on HadoopV26Slave3:38931 (size: 57.0 KB, free: 1657.9 MB) > 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map > output locations for shuffle 0 to sparkExecutor@HadoopV26Slave2:33439 > 15/05/24 16:27:35 INFO spark.MapOutputTrackerMaster: Size of output > statuses > for shuffle 0 is 188 bytes > 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map > output locations for shuffle 0 to sparkExecutor@HadoopV26Slave3:52152 > 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 > in > memory on HadoopV26Slave6:52445 (size: 57.0 KB, free: 2.1 GB) > 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 > in > memory on HadoopV26Slave5:45835 (size: 57.0 KB, free: 2.1 GB) > 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map > output locations for shuffle 0 to sparkExecutor@HadoopV26Slave6:37211 > 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map > output locations for shuffle 0 to sparkExecutor@HadoopV26Slave5:43757 > 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on > HadoopV26Slave3:38931 (size: 1733.1 KB, free: 1656.2 MB) > 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_2 in memory on > HadoopV26Slave2:44597 (size: 1733.1 KB, free: 1656.2 MB) > 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_3 in memory on > HadoopV26Slave6:52445 (size: 1717.4 KB, free: 2.1 GB) > 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 1.0 in stage > 1.0 (TID 5) in 1757 ms on HadoopV26Slave3 (1/4) > 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 2.0 in stage > 1.0 (TID 6) in 1776 ms on HadoopV26Slave2 (2/4) > 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_0 in memory on > HadoopV26Slave5:45835 (size: 1733.1 KB, free: 2.1 GB) > 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 3.0 in stage > 1.0 (TID 7) in 3153 ms on HadoopV26Slave6 (3/4) > 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at > Clustering.java:117) finished in 3.258 s > 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 1.0 (TID 4) in 3256 ms on HadoopV26Slave5 (4/4) > 15/05/24 16:27:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, > whose tasks have all completed, from pool > 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Job 0 finished: > saveAsTextFile at Clustering.java:117, took 3807.229501 s > > although I can obtain the result ,but it's too slow, right? > The followings are also the final result info. > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/result.png > > > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapvalueres.png > > > > PS: if I reduce the size the input to just 10 records, it performs very > fast. But it doesn't make any sense for just 10 records. > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-dramatically-slow-when-I-add-saveAsTextFile-tp23003.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >