How about saving the dataframe as a table partitioned by userId? My User
records have userId, number of sessions, visit count etc as the columns and
it should be partitioned by userId. I will need to join the userTable saved
in the database as follows with an incoming session RDD. The session RDD
would have a sessionId and  a sessionRecord which has the userId. So,
 saving the user  data as a table using dataframes partitioned by userId
and then joining it with session RDDs, needs to be done.  How can I join a
dataframe saved in hdfs with an incoming RDD so that all the records are
not read and only the records for which the join conditions are met are
read?

df.write.partitionBy('userId').saveAsTable(...)


On Mon, Feb 15, 2016 at 10:09 AM, Mich Talebzadeh <
[email protected]> wrote:

>
>
> It depends on how many columns you need from tables for your queries and
> potential number of rows.
>
> From my experience I don't believe that registering a table as temporary
> means it is going to cache whole 1 billion rows into memory. That does not
> make sense (I stand corrected). Only a fraction of rows and columns will be
> needed.
>
> It will be interesting to know how Catalyst is handling this. I suspect it
> behaves much like any data cache in a relational database by having some
> form of MRU-LRU chain where rows are read into memory from the blocks,
> processed and discarded to make room for new ones. If the memory is not big
> enough the operation is spilled to disk.
>
> I just did a test on three tables in Hive with Spark 15.2 using Data
> Frames and tempTables
>
> The FACT table had 1 billion rows as follows:
>
>
> +----------------------------------------------------------------------------+--+
> | CREATE TABLE
> `sales_staging`(                                              |
> |   `prod_id`
> bigint,                                                        |
> |   `cust_id`
> bigint,                                                        |
> |   `time_id`
> timestamp,                                                     |
> |   `channel_id`
> bigint,                                                     |
> |   `promo_id`
> bigint,                                                       |
> |   `quantity_sold`
> decimal(10,0),                                           |
> |   `amount_sold`
> decimal(10,0))                                             |
> | ROW FORMAT
> SERDE                                                           |
> |
> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'                     |
> | STORED AS
> INPUTFORMAT                                                      |
> |
> 'org.apache.hadoop.mapred.TextInputFormat'                               |
> |
> OUTPUTFORMAT                                                               |
> |
> 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'             |
> |
> LOCATION                                                                   |
> |
> 'hdfs://rhes564:9000/user/hive/warehouse/oraclehadoop.db/sales_staging'  |
> | TBLPROPERTIES
> (                                                            |
> |
> 'COLUMN_STATS_ACCURATE'='true',                                          |
> |
> 'last_modified_by'='hduser',                                             |
> |
> 'last_modified_time'='1451305601',                                       |
> |
> 'numFiles'='4',                                                          |
> |   'numRows'='1000000000',
> |
> |
> 'rawDataSize'='46661545000',                                             |
> |
> 'totalSize'='47661545000',                                               |
> |
> 'transient_lastDdlTime'='1451767601')                                    |
>
>
>
> The other dimension tables were tiny. It took 13 minutes to get the first
> 10 rows back but only requiring few columns of interest. So I don't think
> it was loading 1 billion rows into memory from sales_staging table
>
> Started at
>
> [15/02/2016 17:47:28.28]
>
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
>
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
>
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
>
> sqltext: String = ""
>
> sqltext: String =
>
> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>
> FROM
>
> (
>
> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS SalesChannel,
> SUM(t_s.AMOUNT_SOLD) AS TotalSales
>
> FROM t_s, t_t, t_c
>
> WHERE t_s.TIME_ID = t_t.TIME_ID
>
> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>
> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ) rs
>
> LIMIT 10
>
>
>
> [1998-01,Direct Sales,1823005210]
>
> [1998-01,Internet,248172522]
>
> [1998-01,Partners,474646900]
>
> [1998-02,Direct Sales,1819659036]
>
> [1998-02,Internet,298586496]
>
> [1998-02,Partners,534103258]
>
> [1998-03,Direct Sales,1405805622]
>
> [1998-03,Internet,229163168]
>
> [1998-03,Partners,352277328]
>
> [1998-03,Tele Sales,59700082]
>
>  Finished at
>
> [15/02/2016 18:00:50.50]
>
>
>
> On 15/02/2016 17:27, swetha kasireddy wrote:
>
> OK. would it only query for the records that I want in hive as per filter
> or just load the entire table? My user table will have millions of records
> and I do not want to cause OOM errors by loading the entire table in memory.
>
> On Mon, Feb 15, 2016 at 12:51 AM, Mich Talebzadeh <[email protected]>
> wrote:
>
>> Also worthwhile using temporary tables for the joint query.
>>
>>
>>
>> I can join a Hive table with any other JDBC accessed table from any other
>> databases with DF and temporary tables
>>
>>
>>
>> //
>>
>> //Get the FACT table from Hive
>>
>> //
>>
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> oraclehadoop.sales")
>>
>>
>>
>> //
>>
>> //Get the Dimension table from Oracle via JDBC
>>
>> //
>>
>> val c = HiveContext.load("jdbc",
>>
>> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>>
>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC
>> FROM sh.channels)",
>>
>> "user" -> "sh",
>>
>> "password" -> "xxx"))
>>
>>
>>
>>
>>
>> s.registerTempTable("t_s")
>>
>> c.registerTempTable("t_c")
>>
>>
>>
>> And do the join
>>
>>
>>
>> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>>
>> FROM
>>
>> (
>>
>> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS
>> SalesChannel, SUM(t_s.AMOUNT_SOLD) AS TotalSales
>>
>> FROM t_s, t_t, t_c
>>
>> WHERE t_s.TIME_ID = t_t.TIME_ID
>>
>> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>>
>> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>
>> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>
>> ) rs
>>
>> LIMIT 1000
>>
>> """
>>
>> HiveContext.sql(sqltext).collect.foreach(println)
>>
>>
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> NOTE: The information in this email is proprietary and confidential. This
>> message is for the designated recipient only, if you are not the intended
>> recipient, you should destroy it immediately. Any information in this
>> message shall not be understood as given or endorsed by Peridale Technology
>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
>> the responsibility of the recipient to ensure that this email is virus
>> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
>> employees accept any responsibility.
>>
>>
>>
>>
>>
>> *From:* Ted Yu [mailto:[email protected]]
>> *Sent:* 15 February 2016 08:44
>> *To:* SRK <[email protected]>
>> *Cc:* user <[email protected]>
>> *Subject:* Re: How to join an RDD with a hive table?
>>
>>
>>
>> Have you tried creating a DataFrame from the RDD and join with DataFrame
>> which corresponds to the hive table ?
>>
>>
>>
>> On Sun, Feb 14, 2016 at 9:53 PM, SRK <[email protected]> wrote:
>>
>> Hi,
>>
>> How to join an RDD with a hive table and retrieve only the records that I
>> am
>> interested. Suppose, I have an RDD that has 1000 records and there is a
>> Hive
>> table with 100,000 records, I should be able to join the RDD with the hive
>> table  by an Id and I should be able to load only those 1000 records from
>> Hive table so that are no memory issues. Also, I was planning on storing
>> the
>> data in hive in the form of parquet files. Any help on this is greatly
>> appreciated.
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>>
>
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>

Reply via email to