For the skeptics :), here's a version you can easily reproduce at home: val rdd1 = sc.parallelize(1 to 1000, 100) // force with 100 partitions val rdd2 = rdd1.coalesce(100) val rdd3 = rdd2 map { _ + 1000 } val rdd4 = rdd3.coalesce(2) rdd4.collect()
You can see that everything runs as only 2 tasks ... :-/ 2014-06-25 00:43:20,795 INFO org.apache.spark.SparkContext: Starting job: collect at <console>:48 2014-06-25 00:43:20,811 INFO org.apache.spark.scheduler.DAGScheduler: Got job 0 (collect at <console>:48) with 2 output partitions (allowLocal=false) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Final stage: Stage 0 (collect at <console>:48) 2014-06-25 00:43:20,812 INFO org.apache.spark.scheduler.DAGScheduler: Parents of final stage: List() 2014-06-25 00:43:20,821 INFO org.apache.spark.scheduler.DAGScheduler: Missing parents: List() 2014-06-25 00:43:20,827 INFO org.apache.spark.scheduler.DAGScheduler: Submitting Stage 0 (CoalescedRDD[11] at coalesce at <console>:45), which has no missing parents 2014-06-25 00:43:20,898 INFO org.apache.spark.scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (CoalescedRDD[11] at coalesce at <console>:45) 2014-06-25 00:43:20,901 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 2014-06-25 00:43:20,921 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: ip-10-226-98-211.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,939 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:0 as 6632 bytes in 16 ms 2014-06-25 00:43:20,943 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 5: ip-10-13-132-153.ec2.internal (PROCESS_LOCAL) 2014-06-25 00:43:20,951 INFO org.apache.spark.scheduler.TaskSetManager: Serialized task 0.0:1 as 6632 bytes in 8 ms 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 0 in 685 ms on ip-10-226-98-211.ec2.internal (progress: 1/2) 2014-06-25 00:43:21,605 INFO org.apache.spark.scheduler.TaskSetManager: Finished TID 1 in 662 ms on ip-10-13-132-153.ec2.internal (progress: 2/2) 2014-06-25 00:43:21,606 INFO org.apache.spark.scheduler.DAGScheduler: Completed ResultTask(0, 0) 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 2014-06-25 00:43:21,607 INFO org.apache.spark.scheduler.DAGScheduler: Completed ResultTask(0, 1) 2014-06-25 00:43:21,608 INFO org.apache.spark.scheduler.DAGScheduler: Stage 0 (collect at <console>:48) finished in 0.693 s 2014-06-25 00:43:21,616 INFO org.apache.spark.SparkContext: Job finished: collect at <console>:48, took 0.821161249 s res7: Array[Int] = Array(1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020, 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1051, 1052, 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1081, 1082, 1083, 1084, 1085, 1086, 1087, 1088, 1089, 1090, 1101, 1102, 1103, 1104, 1105, 1106, 1107, 1108, 1109, 1110, 1121, 1122, 1123, 1124, 1125, 1126, 1127, 1128, 1129, 1130, 1141, 1142, 1143, 1144, 1145, 1146, 1147, 1148, 1149, 1150, 1161, 1162, 1163, 1164, 1165, 1166, 1167, 1168, 1169, 1170, 1181, 1182, 1183, 1184, 1185, 1186, 1187, 1188, 1189, 1190, 1201, 1202, 1203, 1204, 1205, 1206, 1207, 1208, 1209, 1210, 1221, 1222, 1223, 1224, 1225, 1226, 1227, 1228, 1229, 1230, 1241, 1242, 1243, 1244, 1245, 1246, 1247, 1248, 1249... On Tue, Jun 24, 2014 at 5:39 PM, Alex Boisvert <alex.boisv...@gmail.com> wrote: > Yes. > > scala> rawLogs.partitions.size > res1: Int = 2171 > > > > On Tue, Jun 24, 2014 at 4:00 PM, Mayur Rustagi <mayur.rust...@gmail.com> > wrote: > >> To be clear number of map tasks are determined by number of partitions >> inside the rdd hence the suggestion by Nicholas. >> >> Mayur Rustagi >> Ph: +1 (760) 203 3257 >> http://www.sigmoidanalytics.com >> @mayur_rustagi <https://twitter.com/mayur_rustagi> >> >> >> >> On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas < >> nicholas.cham...@gmail.com> wrote: >> >>> So do you get 2171 as the output for that command? That command tells >>> you how many partitions your RDD has, so it’s good to first confirm that >>> rdd1 has as many partitions as you think it has. >>> >>> >>> >>> On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert <alex.boisv...@gmail.com> >>> wrote: >>> >>>> It's actually a set of 2171 S3 files, with an average size of about >>>> 18MB. >>>> >>>> >>>> On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas < >>>> nicholas.cham...@gmail.com> wrote: >>>> >>>>> What do you get for rdd1._jrdd.splits().size()? You might think >>>>> you’re getting > 100 partitions, but it may not be happening. >>>>> >>>>> >>>>> >>>>> On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert < >>>>> alex.boisv...@gmail.com> wrote: >>>>> >>>>>> With the following pseudo-code, >>>>>> >>>>>> val rdd1 = sc.sequenceFile(...) // has > 100 partitions >>>>>> val rdd2 = rdd1.coalesce(100) >>>>>> val rdd3 = rdd2 map { ... } >>>>>> val rdd4 = rdd3.coalesce(2) >>>>>> val rdd5 = rdd4.saveAsTextFile(...) // want only two output files >>>>>> >>>>>> I would expect the parallelism of the map() operation to be 100 >>>>>> concurrent tasks, and the parallelism of the save() operation to be 2. >>>>>> >>>>>> However, it appears the parallelism of the entire chain is 2 -- I >>>>>> only see two tasks created for the save() operation and those tasks >>>>>> appear >>>>>> to execute the map() operation as well. >>>>>> >>>>>> Assuming what I'm seeing is as-specified (meaning, how things are >>>>>> meant to be), what's the recommended way to force a parallelism of 100 on >>>>>> the map() operation? >>>>>> >>>>>> thanks! >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >