Thanks for the answer,

I have tried this solution but it did not work...

This is the involved part of the code:

data_aux= cruce.groupBy(lambda x: (x[0], x[1]), 80)
#data_aux= cruce.keyBy(lambda x: (x[0], x[1])).groupByKey(80)

exit_1= data_aux.filter(lambda (a,b): len(b) > 1).values()
exit_2 = data_aux.filter(lambda (a,b): len(b) == 1).values()

exit_1.map(lambda x: x.data).saveAsTextFile(exit1_path)
exit_2.map(lambda x: x.data).saveAsTextFile(exit2_path)

As you can see, I am using 80 partitions within the groupBy (and I have
also tried keyBy-GroupByKey). However, the saving process is carried out
only in 4 stages.

What am I doing wrong?

On Wed, Sep 17, 2014 at 6:20 PM, Davies Liu <dav...@databricks.com> wrote:

> On Wed, Sep 17, 2014 at 5:21 AM, Luis Guerra <luispelay...@gmail.com>
> wrote:
> > Hi everyone,
> >
> > Is it possible to fix the number of tasks related to a saveAsTextFile in
> > Pyspark?
> >
> > I am loading several files from HDFS, fixing the number of partitions to
> X
> > (let's say 40 for instance). Then some transformations, like joins and
> > filters are carried out. The weird thing here is that the number of tasks
> > involved in these transformations are 80, i.e. the double of the fixed
> > number of partitions. However, when the saveAsTextFile action is carried
> > out, there are only 4 tasks to do this (and I have not been able to
> increase
> > that number). My problem here is that those 4 tasks make rapidly increase
> > the used memory and take too long to finish.
>
> > I am launching my process from windows to a cluster in ubuntu, with 13
> > computers (4 cores each) with 32 gb of memory, and using pyspark 1.0.2.
>
> The saveAsTextFile() is an mapper RDD, so the number of partitions of it
> is determined by previous RDD.
>
> In Spark 1.0.2, groupByKey() or reduceByKey() will take the number of CPUs
> on driver (locally) as the default partitions, so it's 4. You need to
> change it
> to 40 or 80 in this case.
>
> BTW, In Spark 1.1, groupByKey() and reduceByKey() will use the number of
> partitions of previous RDD as the default value.
>
> Davies
>
> > Any clue with this?
> >
> > Thanks in advance
>

Reply via email to