Dear Sujit, Thanks for your suggestion.
After testing, the `joinWithCassandraTable` does the trick like what you mentioned. The rdd2 only query those data which have the same key in rdd1. Best, Wush 2015-07-16 0:00 GMT+08:00 Sujit Pal <sujitatgt...@gmail.com>: > Hi Wush, > > One option may be to try a replicated join. Since your rdd1 is small, read > it into a collection and broadcast it to the workers, then filter your > larger rdd2 against the collection on the workers. > > -sujit > > > On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain <deepuj...@gmail.com> wrote: >> >> Leftouterjoin and join apis are super slow in spark. 100x slower than >> hadoop >> >> Sent from my iPhone >> >> > On 14-Jul-2015, at 10:59 PM, Wush Wu <wush...@gmail.com> wrote: >> > >> > I don't understand. >> > >> > By the way, the `joinWithCassandraTable` does improve my query time >> > from 40 mins to 3 mins. >> > >> > >> > 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >> >> I have explored spark joins for last few months (you can search my >> >> posts) >> >> and its frustrating useless. >> >> >> >>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wush...@gmail.com> wrote: >> >>> >> >>> Dear all, >> >>> >> >>> I have found a post discussing the same thing: >> >>> >> >>> >> >>> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J >> >>> >> >>> The solution is using "joinWithCassandraTable" and the documentation >> >>> is here: >> >>> >> >>> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md >> >>> >> >>> Wush >> >>> >> >>> 2015-07-15 12:15 GMT+08:00 Wush Wu <wush...@gmail.com>: >> >>>> Dear all, >> >>>> >> >>>> I am trying to join two RDDs, named rdd1 and rdd2. >> >>>> >> >>>> rdd1 is loaded from a textfile with about 33000 records. >> >>>> >> >>>> rdd2 is loaded from a table in cassandra which has about 3 billions >> >>>> records. >> >>>> >> >>>> I tried the following code: >> >>>> >> >>>> ```scala >> >>>> >> >>>> val rdd1 : (String, XXX) = sc.textFile(...).map(...) >> >>>> import org.apache.spark.sql.cassandra.CassandraSQLContext >> >>>> cc.setKeyspace("xxx") >> >>>> val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r => >> >>>> ...) >> >>>> >> >>>> val result = rdd1.leftOuterJoin(rdd2) >> >>>> result.take(20) >> >>>> >> >>>> ``` >> >>>> >> >>>> However, the log shows that the spark loaded 3 billions records from >> >>>> cassandra and only 33000 records left at the end. >> >>>> >> >>>> Is there a way to query the cassandra based on the key in rdd1? >> >>>> >> >>>> Here is some information of our system: >> >>>> >> >>>> - The spark version is 1.3.1 >> >>>> - The cassandra version is 2.0.14 >> >>>> - The key of joining is the primary key of the cassandra table. >> >>>> >> >>>> Best, >> >>>> Wush >> >>> >> >>> --------------------------------------------------------------------- >> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >>> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> >> >> >> >> >> -- >> >> Deepak >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org