Hmm... The count() method invokes this: def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) }
It appears that you're running out of memory while trying to compute (within the driver) the number of partitions that will be in the final result. It seems as if Mongo is computing so many splits that you're running out of memory. Looking at your log messages, I see this: 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id" : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"} 0x54e64d646d0bfe0a24ba79e1 - 0x54e64d626d0bfe0a24ba79b3 = 0x2000000000000002e) = 36893488147419103278 The last split reported in the log has max 55adf841b4d2970fb07d7288. 0x55adf841b4d2970fb07d7288 - 0x54e64d646d0bfe0a24ba79e1 = 0xc7aadd47c699058bc2f8a7 = 241383122307828806444054695 241383122307828806444054695/36893488147419103278 = 6,542,702 potential splits, assuming they are evenly distributed. I'm not sure how big each split object is, but it's plausible that the process of creating an array of 6.5 million of them is causing you to run out of memory. I think the reason you don't see anything in the executor logs is that the exception is occurring before the work is tasked to the executors. Rich On Sat, Sep 12, 2015 at 5:18 PM, Utkarsh Sengar <utkarsh2...@gmail.com> wrote: > I am trying to run this, a basic mapToPair and then count() to trigger an > action. > 4 executors are launched but I don't see any relevant logs on those > executors. > > It looks like the the driver is pulling all the data and it runs out of > memory, the dataset is big, so it won't fit on 1 machine. > > So what is the issue here? I am using spark in a wrong way in this example? > > Configuration mongodbConfigInventoryDay = new Configuration(); > mongodbConfigInventoryDay.set("mongo.job.input.format", > "com.mongodb.hadoop.MongoInputFormat"); > mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" + > props.getProperty("mongo") + ":27017/A.MyColl"); > JavaPairRDD<Object, BSONObject> MyColl = sc.newAPIHadoopRDD( > mongodbConfigInventoryDay, > MongoInputFormat.class, > Object.class, > BSONObject.class > ); > JavaPairRDD<Long, MyColl> myCollRdd = myColl.mapToPair(tuple2 -> { > ObjectMapper mapper = new ObjectMapper(); > tuple2._2().removeField("_id"); > MyColl day = mapper.readValue(tuple2._2().toMap().toString(), > MyColl.class); > return new Tuple2<>(Long.valueOf((String) > tuple2._2().get("MyCollId")), day); > }); > > myCollRdd.count(); > > > Logs on the driver: > 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with > curMem=253374, maxMem=278019440 > 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 117.8 KB, free 264.8 MB) > 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with > curMem=374038, maxMem=278019440 > 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as > bytes in memory (estimated size 12.5 KB, free 264.8 MB) > 15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in > memory on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB) > 15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from > newAPIHadoopRDD at SparkRunner.java:192 > 15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to > check splits against mongodb:// > dsc-dbs-0000001.qasql.opentable.com:27017/A.MyColl > 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null, > max= { "_id" : "54e64d626d0bfe0a24ba79b3"} > 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id" > : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"} > 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id" > : "54e64d646d0bfe0a24ba79e1"}, max= { "_id" : "5581d1c3d52db40bc8558c6b"} > ...... > ...... > 15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={ "_id" > : "55adf840d3b5be0724224807"}, max= { "_id" : "55adf841b4d2970fb07d7288"} > Exception in thread "main" java.lang.OutOfMemoryError: Java heap space > at org.bson.io.PoolOutputBuffer.<init>(PoolOutputBuffer.java:224) > at org.bson.BasicBSONDecoder.<init>(BasicBSONDecoder.java:499) > at > com.mongodb.hadoop.input.MongoInputSplit.<init>(MongoInputSplit.java:59) > at > com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248) > at > com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157) > at > com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58) > at > org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) > at org.apache.spark.rdd.RDD.count(RDD.scala:1099) > at > org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:442) > at > org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47) > at runner.SparkRunner.getInventoryDayRdd(SparkRunner.java:205) > at runner.SparkRunner.main(SparkRunner.java:68) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > -- > Thanks, > -Utkarsh > -- Rich