how does Spark SQL/DataFrame know that train_users_2.csv has a field named, "id" or anything else domain specific? is there a header? if so, does sc.textFile() know about this header?
I'd suggest using the Databricks spark-csv package for reading csv data. there is an option in there to specify whether or not a header exists and, if so, use this as the column names. one caveat: I've had some issues relying on this specific feature of the spark-csv package, so I almost always call .toDF("id", "firstname", "lastname", ...) on the created DataFrame to explicitly name the columns exactly what I want them to be. I've also had some uppercase/lowercase and "_" and "-" issues with some of these connectors including spark-csv, spark-cassandra, and a few others, so I try to stick to simple lower-case column names without special chars just to be safe. note: these issues are connector specific, so not all should be subjected to this type of pessimism. just pointing these out for completeness. > On Dec 19, 2015, at 5:30 PM, Weiwei Zhang <wzhan...@dons.usfca.edu> wrote: > > Hi all, > > I got this error when I tried to use the 'join' function to left outer join > two data frames in pyspark 1.4.1. > Please kindly point out the places where I made mistakes. Thank you. > > Traceback (most recent call last): > File "/Users/wz/PycharmProjects/PysparkTraining/Airbnb/src/driver.py", line > 46, in <module> > trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, > 'left_outer') > File > "/Users/wz/Downloads/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py", > line 701, in __getattr__ > "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) > AttributeError: 'DataFrame' object has no attribute 'id' - It does have a > column called "id" > 15/12/19 14:15:00 INFO SparkContext: Invoking stop() from shutdown hook > > Here is the code: > from pyspark import SparkContext > from pyspark import SparkConf > from pyspark.sql import SQLContext > from pyspark.mllib.regression import LabeledPoint > from pyspark.mllib.classification import LogisticRegressionWithSGD > from pyspark.sql.functions import * > conf = SparkConf().set("spark.executor.memory", "4g") > sc = SparkContext(conf= conf) > sqlCtx = SQLContext(sc) > > train = sc.textFile("../train_users_2.csv").map(lambda line: line.split(",")) > print train.first() > trainDF = sqlCtx.createDataFrame(train) > > test = sc.textFile("../test_users.csv").map(lambda line: line.split(",")) > testDF = sqlCtx.createDataFrame(test) > > session = sc.textFile("../sessions.csv").map(lambda line: line.split(",")) > sessionDF = sqlCtx.createDataFrame(session) > > # join train with session (Error) > trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, > 'left_outer') > > Best Regards, > WZ