Hi all, I am using the GrpahLoader class to load graphs from edge list files. But then I need to change the storage level of the graph to some other thing than MEMORY_ONLY.
val graph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER) The error I am getting while executing this is: Exception in thread "main" java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level Then I looked into the GraphLoader class. I know that in the latest version of spark support for setting persistence level is provided in this class. Please suggest a workaround for spark 1.0.0 as I do not have the option to shift to latest release. Note: I tried copying the GraphLoader class to my package as GraphLoader1 importing package com.cloudera.xyz import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl._ and then changing the persistence level to my suitability as .persist(gStorageLevel) instead of .cache() But while compiling I am getting the following errors GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl cannot be accessed in package org.apache.spark.graphx.impl [INFO] val builder = new EdgePartitionBuilder[Int, Int] I am also attaching the file with the mail. Maybe this way of doing thing is not possible. Please suggest some workarounds so that I can set persistence level of my graph to MEMORY_AND_DISK_SER for the graph I read from edge file list
package com.cloudera.sparkwordcount import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl._ /** * Provides utilities for loading [[Graph]]s from files. */ object GraphLoader1 extends Logging { /** * Loads a graph from an edge list formatted file where each line contains two integers: a source * id and a target id. Skips lines that begin with `#`. * * If desired the edges can be automatically oriented in the positive * direction (source Id < target Id) by setting `canonicalOrientation` to * true. * * @example Loads a file in the following format: * {{{ * # Comment Line * # Source Id <\t> Target Id * 1 -5 * 1 2 * 2 7 * 1 8 * }}} * * @param sc SparkContext * @param path the path to the file (e.g., /home/data/file or hdfs://file) * @param canonicalOrientation whether to orient edges in the positive * direction * @param minEdgePartitions the number of partitions for the edge RDD */ def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1) : Graph[Int, Int] = { val startTime = System.currentTimeMillis val gStorageLevel = StorageLevel.MEMORY_AND_DISK_SER // Parse the edge data table directly into edge partitions val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions) val edges = lines.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[Int, Int] iter.foreach { line => if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") if (lineArray.length < 2) { logWarning("Invalid line: " + line) } val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong if (canonicalOrientation && srcId > dstId) { builder.add(dstId, srcId, 1) } else { builder.add(srcId, dstId, 1) } } } Iterator((pid, builder.toEdgePartition)) }.persist(gStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path)) edges.count() logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) } // end of edgeListFile }
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org