how does Spark SQL/DataFrame know that train_users_2.csv has a field named, 
"id" or anything else domain specific?  is there a header?  if so, does 
sc.textFile() know about this header?

I'd suggest using the Databricks spark-csv package for reading csv data.  there 
is an option in there to specify whether or not a header exists and, if so, use 
this as the column names.

one caveat:  I've had some issues relying on this specific feature of the 
spark-csv package, so I almost always call .toDF("id", "firstname", "lastname", 
...) on the created DataFrame to explicitly name the columns exactly what I 
want them to be.

I've also had some uppercase/lowercase and "_" and "-" issues with some of 
these connectors including spark-csv, spark-cassandra, and a few others, so I 
try to stick to simple lower-case column names without special chars just to be 
safe.

note:  these issues are connector specific, so not all should be subjected to 
this type of pessimism.  just pointing these out for completeness.

> On Dec 19, 2015, at 5:30 PM, Weiwei Zhang <wzhan...@dons.usfca.edu> wrote:
> 
> Hi all, 
> 
> I got this error when I tried to use the 'join' function to left outer join 
> two data frames in pyspark 1.4.1. 
> Please kindly point out the places where I made mistakes. Thank you. 
> 
> Traceback (most recent call last):
>   File "/Users/wz/PycharmProjects/PysparkTraining/Airbnb/src/driver.py", line 
> 46, in <module>
>     trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, 
> 'left_outer')
>   File 
> "/Users/wz/Downloads/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py",
>  line 701, in __getattr__
>     "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
> AttributeError: 'DataFrame' object has no attribute 'id' - It does have a 
> column called "id"
> 15/12/19 14:15:00 INFO SparkContext: Invoking stop() from shutdown hook
> 
> Here is the code:
> from pyspark import SparkContext
> from pyspark import SparkConf
> from pyspark.sql import SQLContext
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.classification import LogisticRegressionWithSGD
> from pyspark.sql.functions import *
> conf = SparkConf().set("spark.executor.memory", "4g")
> sc = SparkContext(conf= conf)
> sqlCtx = SQLContext(sc)
> 
> train = sc.textFile("../train_users_2.csv").map(lambda line: line.split(","))
> print train.first()
> trainDF = sqlCtx.createDataFrame(train)
> 
> test = sc.textFile("../test_users.csv").map(lambda line: line.split(","))
> testDF = sqlCtx.createDataFrame(test)
> 
> session = sc.textFile("../sessions.csv").map(lambda line: line.split(","))
> sessionDF = sqlCtx.createDataFrame(session)
> 
> # join train with session (Error)
> trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, 
> 'left_outer')
> 
> Best Regards, 
> WZ

Reply via email to