Hi,
thank you for your response. I removed issues you mentioned. Now I read
RDDs from files, whole rdd is cached, I don't use random and rdd1 and rdd2
are identical.
RDDs that are joined contains 100k entries and result contains 10m entries.
rdd1 and rdd2 after join also contains 10m entries. Here is the code:

  val help = sc.textFile("input/tab1")
    .map{ s => val pair = s.split("\t"); (pair(0).toInt, pair(1).toInt)}
    .repartition(100)
  val rdd = sc.textFile("input/tab2")
    .map{ s => val pair = s.split("\t"); (pair(0).toInt, pair(1).toInt)}
    .repartition(100)
    .join(help)
    .map { case (x, (n, i)) => (x, n)}
    .cache() // or without it

  val rdd1 = sc.parallelize(Array.range(0, 1000)).map(x => (x, x))
    .join(rdd).saveAsTextFile("output/1")
  val rdd2 = sc.parallelize(Array.range(0, 1000)).map(x => (x, x))
    .join(rdd).saveAsTextFile("output/2")

Files input/tab1, input/tab2 were generated using this python code:
for x in range(100000):
    file1.write("%d\t%d\n" % (random.randint(0, 1000), x))
    file2.write("%d\t%d\n" % (random.randint(0, 1000), x))

When using cache whole rdd is cached. It's size is 362MB.
Results are similar:
Without cache: stages used to compute rdd: 30s, rdd1: 16s, rdd2: 12s
With cache: stages used to compute rdd: 28s, rdd1: 14s, rdd2: 15s

I thought that without caching rdd, computing rdd2 will be much longer due
to recomputing of rdd. But it seems that it doesn't work that way. Could
you explain it or point to an example which shows power of caching?

Thanks,
Grzegorz


On Wed, Aug 20, 2014 at 11:22 PM, Patrick Wendell <pwend...@gmail.com>
wrote:

> Your rdd2 and rdd3 differ in two ways so it's hard to track the exact
> effect of caching. In rdd3, in addition to the fact that rdd will be
> cached, you are also doing a bunch of extra random number generation. So it
> will be hard to isolate the effect of caching.
>
>
> On Wed, Aug 20, 2014 at 7:48 AM, Grzegorz Białek <
> grzegorz.bia...@codilime.com> wrote:
>
>> Hi,
>>
>> I tried to write small program which shows that using cache() can speed
>> up execution but results with and without cache were similar. Could help me
>> with this issue? I tried to compute rdd and use it later in two places and
>> I thought in second usage this rdd is recomputed but it doesn't:
>>
>>   val help = sc.parallelize(Array.range(1, 20000)).repartition(100)
>>     .map(x => (scala.util.Random.nextInt(10), x))
>>   val rdd = sc.parallelize(Array.range(1,20000))
>>      .repartition(100)
>>     .map(x => (scala.util.Random.nextInt(10), x))
>>     .join(help)
>>     .map { case (x, (n, i)) => (x, n)}
>>     .reduceByKey(_ + _)
>>     .cache()
>>
>>   val rdd2 = sc.parallelize(Array.range(1,1000)).map(x => (x, x))
>>     .join(rdd).saveAsTextFile("output/1")
>>   val rdd3 = sc.parallelize(Array.range(1,1000)).map(x =>
>> (scala.util.Random.nextInt(1000), x))
>>     .join(rdd).saveAsTextFile("output/2")
>>
>> Thanks,
>> Grzegorz
>>
>
>

Reply via email to