Hi all,
I am using the GrpahLoader class to load graphs from edge list files. But
then I need to change the storage level of the graph to some other thing
than MEMORY_ONLY.

val graph = GraphLoader.edgeListFile(sc, fname,
          minEdgePartitions =
numEPart).persist(StorageLevel.MEMORY_AND_DISK_SER)

The error I am getting while executing this is:
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot
change storage level of an RDD after it was already assigned a level


Then I looked into the GraphLoader class. I know that in the latest version
of spark support for setting persistence level is provided in this class.
Please suggest a workaround for spark 1.0.0 as I do not have the option to
shift to latest release.

Note: I tried copying the GraphLoader class to my package as GraphLoader1
importing

package com.cloudera.xyz

import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl._

and then changing the persistence level to my suitability as
.persist(gStorageLevel) instead of .cache()

But while compiling I am getting the following errors

GraphLoader1.scala:49: error: class EdgePartitionBuilder in package impl
cannot be accessed in package org.apache.spark.graphx.impl
[INFO]       val builder = new EdgePartitionBuilder[Int, Int]

I am also attaching the file with the mail. Maybe this way of doing thing
is not possible.


Please suggest some workarounds so that I can set persistence level of my
graph to MEMORY_AND_DISK_SER for the graph I read from edge file list
package com.cloudera.sparkwordcount

import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl._

/**
 * Provides utilities for loading [[Graph]]s from files.
 */
object GraphLoader1 extends Logging {

  /**
   * Loads a graph from an edge list formatted file where each line contains two integers: a source
   * id and a target id. Skips lines that begin with `#`.
   *
   * If desired the edges can be automatically oriented in the positive
   * direction (source Id < target Id) by setting `canonicalOrientation` to
   * true.
   *
   * @example Loads a file in the following format:
   * {{{
   * # Comment Line
   * # Source Id <\t> Target Id
   * 1   -5
   * 1    2
   * 2    7
   * 1    8
   * }}}
   *
   * @param sc SparkContext
   * @param path the path to the file (e.g., /home/data/file or hdfs://file)
   * @param canonicalOrientation whether to orient edges in the positive
   *        direction
   * @param minEdgePartitions the number of partitions for the edge RDD
   */
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int] =
  {
    val startTime = System.currentTimeMillis
    val gStorageLevel = StorageLevel.MEMORY_AND_DISK_SER
    // Parse the edge data table directly into edge partitions
    val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
    val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
      val builder = new EdgePartitionBuilder[Int, Int]
      iter.foreach { line =>
        if (!line.isEmpty && line(0) != '#') {
          val lineArray = line.split("\\s+")
          if (lineArray.length < 2) {
            logWarning("Invalid line: " + line)
          }
          val srcId = lineArray(0).toLong
          val dstId = lineArray(1).toLong
          if (canonicalOrientation && srcId > dstId) {
            builder.add(dstId, srcId, 1)
          } else {
            builder.add(srcId, dstId, 1)
          }
        }
      }
      Iterator((pid, builder.toEdgePartition))
    }.persist(gStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
    edges.count()

    logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

    GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
  } // end of edgeListFile

}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to