Hi All,
I met the titled error. This exception occured in line 223, as shown below:
212 // read files
213 val lines =
sc.textFile(path_edges).map(line=>line.split(",")).map(line=>((line(0),
line(1)), line(2).toDouble)).reduceByKey(_+
_).cache
214
215 val lines_vertices = lines.map{line=>(line._1._1,
Map(nameHash(line._1._2)->line._2))}.reduceByKey(_++_).cache
216
217 val name_shadow = "_shadow"
218
219 val nodes =
220 lines_vertices
221 .map{line=>(nameHash(line._1), (1, Map[VertexId,Double](),
line._1))} ++
222 lines_vertices
223 .map{line=>(nameHash(line._1 + name_shadow), (2,line._2, line._1 +
name_shadow))} ++
224 lines
225 .map{line=>(nameHash(line._1._2), (3, Map[VertexId,Double](),
line._1._2))}
Sorry for posting the source codes, but I couldn't think of a better way.
I am confused how come the partitions were unequal, and how I can control the
number of partitions of these RDD. Can someone give me some advice on this
problem?
Thanks very much!
Best,
Bin