Hi I am executing a simple flow as shown below
*data = sc.wholeTextFiles(...) tokens = data.flatMap(<<function>>) counts = tokens.map(lambda token: (token,1)) counters = counts.reduceByKey(lambda a,b: a+b) counters.sortBy(lambda x:x[1],False).saveAsTextFile(...) * There are some problems that I am facing 1. If I execute this code as is then it runs fine. However it takes a lot of time. Given my cluster size and the data size I feel the time is on the higher side. What I realized is that the number of partitions that the variable data has is just 2. That is one reason probably why the speed is less 2. To overcome the partition problem I added a second argument to the function sc.wholeTextFiles. This was the number of partitions, I passed a higher number like 100. There was speed up but then I see this exception * File "/hadoop/yarn/local/usercache/nrec/filecache/512/spark-assembly-1.2.1.2.2.6.0-2800-hadoop2.6.0.2.2.6.0-2800.jar/pyspark/worker.py", line 107, in main process() File "/hadoop/yarn/local/usercache/nrec/filecache/512/spark-assembly-1.2.1.2.2.6.0-2800-hadoop2.6.0.2.2.6.0-2800.jar/pyspark/worker.py", line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 2081, in pipeline_func return func(split, prev_func(split, iterator)) File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 2081, in pipeline_func return func(split, prev_func(split, iterator)) File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 2081, in pipeline_func return func(split, prev_func(split, iterator)) File "/usr/hdp/2.2.6.0-2800/spark/python/pyspark/rdd.py", line 245, in func return chain.from_iterable(imap(f, iterator)) SystemError: Objects/cellobject.c:24: bad argument to internal function at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)* The processing however does complete and I do get the results, sometimes though even that does not happen. I am on version 1.2.1 3. No matter how many partitions I do the last Stage takes a really long time, 95% of the total execution time. I am assuming the reduce is not working and I need to do some sort of combine operation before reduce. Is that supported in Spark or is there some other better alternative Thanks Rishi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-Partitioning-tp23906.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org