Hello all, I have an error in pyspark for which I have not the faintest idea of the cause. All I can tell from the stack trace is that it can't find a pyspark file on the path /mnt/spark-*/pyspark-*. Apart from that I need someone more experienced than me with Spark to look into it and help diagnose the problem and suggest potential solutions, hence I am looking to this group for help.
If anyone wants to read the same question on Stack Overflow here is the link: http://stackoverflow.com/questions/30328104/pyspark-job-throwing-ioerror Here's the same thing pasted as raw text: I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am using very few input files to perform the job so I don't think it's a memory (space). I do not do a broadcast in any part of my code. So it is surprising to me when the broadcast.py fails? I however do have python dictionaries that I have in shared memory without explicitly doing a broadcast. Can anyone help me understand what is going on? I have appended my python file and the stack trace to this email. Thanks, Nikhil
from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
import subprocess
from itertools import combinations
"""We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts."""
def create_indices(inputdir):
items=dict()
user_id_to_idx=dict()
user_idx_to_id=dict()
item_idx_to_id=dict()
item_id_to_idx=dict()
item_idx=0
user_idx=0
cat=subprocess.Popen(["hadoop","fs","-cat","/user/hadoop/"+inputdir+"/*.txt"],stdout=subprocess.PIPE)
for line in cat.stdout:
toks=map(str,line.strip().split("\t"))
try:
user_id_to_idx[toks[1].strip()]
except KeyError:
if toks[1].strip()!=None:
user_id_to_idx[toks[1].strip()]=user_idx
user_idx_to_id[user_idx]=toks[1].strip()
user_idx+=1
try:
item_id_to_idx[toks[0].strip()]
except KeyError:
if toks[0].strip()!=None:
item_id_to_idx[toks[0].strip()]=item_idx
item_idx_to_id[item_idx]=toks[0].strip()
item_idx+=1
return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx
def concat_helper(a,b):
if(a!= None and b!=None):
print a,b,a.update(b)
temp=dict()
temp.update(a)
temp.update(b)
return temp
elif a!=None:
return a
elif b!=None:
return b
# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
rdd_text=sc.textFile(inputdir)
try:
new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey()
except KeyError:
print item_id_to_idx.keys()
pass
return new_rdd
if __name__=="__main__":
sc = SparkContext()
u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])
u_idx_to_id_b=sc.broadcast(u_idx_to_id)
u_id_to_idx_b=sc.broadcast(u_id_to_idx)
i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
i_id_to_idx_b=sc.broadcast(i_id_to_idx)
num_users=sc.broadcast(u_idx)
num_items=sc.broadcast(i_idx)
item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx)
item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1])))
item_dict_rdd_new.saveAsTextFile("hdfs://output_path")
#dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2))
dot_products_rdd.saveAsTextFile("hdfs://output_path_2")
stacktrace
Description: stacktrace
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
