I am writing my first project in spark. It is the implementation of the
"Space Saving" counting algorithm.
I am trying to understand how tasks are executed in partitions. 
As you can see from my code the algorithms keeps in memory only a small
amount of words for example 100. The top-k ones not all of them. For each
new word it arrives, it replaces the least frequent one and updates the
counts and errors. If the word exists it increments its count. My question
is where and how this code will be executed. 
For example i set the rod's parallelism to 4. So each task will be split in
4 partitions right?
When i collect the words shouldn't i be returned 4*k words, since i have 4
partitions with k words kept in each one (k words from each partition)?  If
true why am i getting only k words as result when i collect them?? Where is
the merging happening??
Is there a way to know which partitions returns what?? Is there a
partitionid ???
How can i monitor each partitions execution is that possible to know for
example what words get executed and were (i mean the partition)

My code:
===================================================
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by mixtou on 30/12/14.
 */

object SpaceSaving {

  var frequent_words_counters = scala.collection.immutable.Map[String,
Array[Int]]();
  var guaranteed_words = scala.collection.immutable.Map[String,
Array[Int]]();
  val top_k: Int = 100;
  var words_no: Int = 0;
  var tStart: Long = 0;
  var tStop: Long = 0;
  var fi: Double = 0.01;

  def main (args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("Space Saving
Project").setMaster("local");
    val ctx = new SparkContext(sparkConf);

    val lines =
ctx.textFile("/Users/mixtou/PhD/Courses/Databases_Advanced/Project/scala_spark_space_saving/src/main/resources/README.txt",
4)
      .map(line => line.toLowerCase());
    val nonEmptyLines = lines.filter(line => line.nonEmpty);
    val regex = "[,.:;'\"\\?\\-!\\(\\)\\+\\[\\]\\d+]".r;
    val cleanLines = nonEmptyLines.map(line => regex.replaceAllIn(line, "
"));
    val dirtyWords = cleanLines.flatMap(line => line.split("\\s+"));
    val words = dirtyWords.filter(word => word.length > 3);

    words.foreach(word => space_saving_algorithm(word));

    if (frequent_words_counters.size > 0) {
      frequent_words_counters.foreach(line => println("Top Frequent Word: "
+ line._1 + " with count: " + line._2(0) + " end error: " + line._2(1)));
    }

    System.out.println("=========== Throughput:=> "+ 1000*(words_no/(tStop -
tStart))+ " words per second." );
    estimateGuaranteedFrequentWords();

    ctx.stop();
  }

  def space_saving_algorithm(word: String) = {

    if (frequent_words_counters.contains(word)) {
      val count = frequent_words_counters.get(word).get(0);
      val error = frequent_words_counters.get(word).get(1);
      frequent_words_counters += word -> Array[Int](count + 1, error);
    }
    else {

      if (frequent_words_counters.size < top_k) {
        frequent_words_counters += word -> Array[Int](1, 0);
      }
      else {
        replaceLeastEntry(word);
      }
    }

    if (words_no > 0) {
      tStop = java.lang.System.currentTimeMillis();
    }
    else {
      tStart = java.lang.System.currentTimeMillis();
    }
    words_no += 1;
  }

  def replaceLeastEntry(word: String): Unit = {

    var temp_list = frequent_words_counters.toList.sortWith( (x,y) =>
x._2(0) > y._2(0) );
    val word_count = temp_list.last._2(0);
    temp_list = temp_list.take(temp_list.length - 1);

    frequent_words_counters = temp_list.toMap[String, Array[Int]];
    frequent_words_counters += word -> Array[Int](word_count+1, word_count);
  }

  def estimateGuaranteedFrequentWords(): Unit = {
    frequent_words_counters.foreach{tuple =>
      if (tuple._2(0) - tuple._2(1) < words_no*fi) {
        guaranteed_words -= tuple._1;
      }
      else {
        System.out.println("Guaranteed Word : "+tuple._1+" with count:
"+tuple._2(0)+" and error: "+tuple._2(1));

      }
    }
  }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-Question-on-How-Tasks-are-Executed-tp21064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to