Hi all,
I’m getting a TreeNodeException for unresolved attributes when I do a simple
select from a schemaRDD generated by a join in Spark 1.1.0. A little background
first. I am using a HiveContext (against Hive 0.12) to grab two tables, join
them, and then perform multiple INSERT-SELECT with GROUP BY to write back out
to a Hive rollup table that has two partitions. This task is an effort to
simulate the unsupported GROUPING SETS functionality in SparkSQL.
In my first attempt, I got really close using SchemaRDD.groupBy until I
realized that SchemaRDD.insertTo API does not support partitioned tables yet.
This prompted my second attempt to pass in SQL to the HiveContext.sql API
instead.
Here’s a rundown of the commands I executed on the spark-shell:
val hc = new HiveContext(sc)
hc.setConf("spark.sql.hive.convertMetastoreParquet", "true”)
hc.setConf("spark.sql.parquet.compression.codec", "snappy”)
// For implicit conversions to Expression
val sqlContext = new SQLContext(sc)
import sqlContext._
val segCusts = hc.hql(“select …”)
val segTxns = hc.hql(“select …”)
val sc = segCusts.as('sc)
val st = segTxns.as(‘st)
// Join the segCusts and segTxns tables
val rup = sc.join(st, Inner,
Some("sc.segcustomerid".attr==="st.customerid".attr))
rup.registerAsTable(“rupbrand”)
If I do a printSchema on the rup, I get:
root
|-- segcustomerid: string (nullable = true)
|-- sales: double (nullable = false)
|-- tx_count: long (nullable = false)
|-- storeid: string (nullable = true)
|-- transdate: long (nullable = true)
|-- transdate_ts: string (nullable = true)
|-- transdate_dt: string (nullable = true)
|-- unitprice: double (nullable = true)
|-- translineitem: string (nullable = true)
|-- offerid: string (nullable = true)
|-- customerid: string (nullable = true)
|-- customerkey: string (nullable = true)
|-- sku: string (nullable = true)
|-- quantity: double (nullable = true)
|-- returnquantity: double (nullable = true)
|-- channel: string (nullable = true)
|-- unitcost: double (nullable = true)
|-- transid: string (nullable = true)
|-- productid: string (nullable = true)
|-- id: string (nullable = true)
|-- campaign_campaigncost: double (nullable = true)
|-- campaign_begindate: long (nullable = true)
|-- campaign_begindate_ts: string (nullable = true)
|-- campaign_begindate_dt: string (nullable = true)
|-- campaign_enddate: long (nullable = true)
|-- campaign_enddate_ts: string (nullable = true)
|-- campaign_enddate_dt: string (nullable = true)
|-- campaign_campaigntitle: string (nullable = true)
|-- campaign_campaignname: string (nullable = true)
|-- campaign_id: string (nullable = true)
|-- product_categoryid: string (nullable = true)
|-- product_company: string (nullable = true)
|-- product_brandname: string (nullable = true)
|-- product_vendorid: string (nullable = true)
|-- product_color: string (nullable = true)
|-- product_brandid: string (nullable = true)
|-- product_description: string (nullable = true)
|-- product_size: string (nullable = true)
|-- product_subcategoryid: string (nullable = true)
|-- product_departmentid: string (nullable = true)
|-- product_productname: string (nullable = true)
|-- product_categoryname: string (nullable = true)
|-- product_vendorname: string (nullable = true)
|-- product_sku: string (nullable = true)
|-- product_subcategoryname: string (nullable = true)
|-- product_status: string (nullable = true)
|-- product_departmentname: string (nullable = true)
|-- product_style: string (nullable = true)
|-- product_id: string (nullable = true)
|-- customer_lastname: string (nullable = true)
|-- customer_familystatus: string (nullable = true)
|-- customer_customertype: string (nullable = true)
|-- customer_city: string (nullable = true)
|-- customer_country: string (nullable = true)
|-- customer_state: string (nullable = true)
|-- customer_region: string (nullable = true)
|-- customer_customergroup: string (nullable = true)
|-- customer_maritalstatus: string (nullable = true)
|-- customer_agerange: string (nullable = true)
|-- customer_zip: string (nullable = true)
|-- customer_age: double (nullable = true)
|-- customer_address2: string (nullable = true)
|-- customer_incomerange: string (nullable = true)
|-- customer_gender: string (nullable = true)
|-- customer_customerkey: string (nullable = true)
|-- customer_address1: string (nullable = true)
|-- customer_email: string (nullable = true)
|-- customer_education: string (nullable = true)
|-- customer_birthdate: long (nullable = true)
|-- customer_birthdate_ts: string (nullable = true)
|-- customer_birthdate_dt: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- customer_firstname: string (nullable = true)
|-- transnum: long (nullable = true)
|-- transmonth: string (nullable = true)
Nothing but a flat schema with no duplicated column names. I then execute:
hc.sql(“select transid from rupbrand”)
and I get:
scala> hc.sql("select transid from rupbrand")
14/10/20 10:01:44 INFO ParseDriver: Parsing command: select transid from
rupbrand
14/10/20 10:01:44 INFO ParseDriver: Parse Completed
res18: org.apache.spark.sql.SchemaRDD =
SchemaRDD[121] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: 'transid, tree:
Project ['transid]
LowerCaseSchema
Subquery rupbrand
Join Inner, Some(('sc.segcustomerid = 'st.customerid))
Subquery sc
Filter CAST((COUNT(DISTINCT 't.transid) > 0), BooleanType)
Aggregate ['c.customerid], ['c.customerId AS
segcustomerid#5,SUM('t.sales) AS sales#6,COUNT(DISTINCT 't.transid) AS
tx_count#7]
Filter 'c.gender IN (Male)
Join Inner, Some(('c.customerid = 't.customerid))
Subquery t
Aggregate [customerid#3259,transid#3266], ['d.customerId AS
customerid#1,transid#3266 AS transid#2,SUM((quantity#3262 * …
I’m wondering if the query for the column in my join table is somehow
conflicting with the columns from the two tables on which the join table is
constructed from as I see the plan, a breakdown of various pieces from the
queries on my two source tables.
Help?
Thanks,
-Terry