Dear Sujit,

Thanks for your suggestion.

After testing, the `joinWithCassandraTable` does the trick like what
you mentioned.

The rdd2 only query those data which have the same key in rdd1.

Best,
Wush

2015-07-16 0:00 GMT+08:00 Sujit Pal <sujitatgt...@gmail.com>:
> Hi Wush,
>
> One option may be to try a replicated join. Since your rdd1 is small, read
> it into a collection and broadcast it to the workers, then filter your
> larger rdd2 against the collection on the workers.
>
> -sujit
>
>
> On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain <deepuj...@gmail.com> wrote:
>>
>> Leftouterjoin and join apis are super slow in spark. 100x slower than
>> hadoop
>>
>> Sent from my iPhone
>>
>> > On 14-Jul-2015, at 10:59 PM, Wush Wu <wush...@gmail.com> wrote:
>> >
>> > I don't understand.
>> >
>> > By the way, the `joinWithCassandraTable` does improve my query time
>> > from 40 mins to 3 mins.
>> >
>> >
>> > 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>> >> I have explored spark joins for last few months (you can search my
>> >> posts)
>> >> and its frustrating useless.
>> >>
>> >>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wush...@gmail.com> wrote:
>> >>>
>> >>> Dear all,
>> >>>
>> >>> I have found a post discussing the same thing:
>> >>>
>> >>>
>> >>> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
>> >>>
>> >>> The solution is using "joinWithCassandraTable" and the documentation
>> >>> is here:
>> >>>
>> >>> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
>> >>>
>> >>> Wush
>> >>>
>> >>> 2015-07-15 12:15 GMT+08:00 Wush Wu <wush...@gmail.com>:
>> >>>> Dear all,
>> >>>>
>> >>>> I am trying to join two RDDs, named rdd1 and rdd2.
>> >>>>
>> >>>> rdd1 is loaded from a textfile with about 33000 records.
>> >>>>
>> >>>> rdd2 is loaded from a table in cassandra which has about 3 billions
>> >>>> records.
>> >>>>
>> >>>> I tried the following code:
>> >>>>
>> >>>> ```scala
>> >>>>
>> >>>> val rdd1 : (String, XXX) = sc.textFile(...).map(...)
>> >>>> import org.apache.spark.sql.cassandra.CassandraSQLContext
>> >>>> cc.setKeyspace("xxx")
>> >>>> val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
>> >>>> ...)
>> >>>>
>> >>>> val result = rdd1.leftOuterJoin(rdd2)
>> >>>> result.take(20)
>> >>>>
>> >>>> ```
>> >>>>
>> >>>> However, the log shows that the spark loaded 3 billions records from
>> >>>> cassandra and only 33000 records left at the end.
>> >>>>
>> >>>> Is there a way to query the cassandra based on the key in rdd1?
>> >>>>
>> >>>> Here is some information of our system:
>> >>>>
>> >>>> - The spark version is 1.3.1
>> >>>> - The cassandra version is 2.0.14
>> >>>> - The key of joining is the primary key of the cassandra table.
>> >>>>
>> >>>> Best,
>> >>>> Wush
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >>
>> >>
>> >> --
>> >> Deepak
>> >>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to