Hi I'm running a standalone cluster with 8 worker servers. 
I'm developing a streaming app that is adding new lines of text to several
different RDDs each batch interval. Each line has a well randomized unique
identifier that I'm trying to use for partitioning, since the data stream
does contain duplicates lines. I'm doing partitioning with this:

val eventsByKey =  streamRDD.map { event => (getUID(event), event)}
val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
       .partitionBy(new HashPartitioner(numPartions)).map(e => e._2)

I'm adding to the existing RDD like with this:

val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
    (currentIter,batchIter) => 
    val uniqEvents = ListBuffer[String]()
    val uids = Map[String,Boolean]()
    Array(currentIter, batchIter).foreach { iter => 
      iter.foreach { event =>
        val uid = getUID(event)
        if (!uids.contains(uid)) {
            uids(uid) = true
            uniqEvents +=event
        }
      }
    }
    uniqEvents.iterator
}

val count = mergedRDD.count

The reason I'm doing it this way is that when I was doing:

val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
val count = mergedRDD.count

It would start taking a long time and a lot of shuffles.

The zipPartitions approach does perform better, though after running an hour
or so I start seeing this 
in the webUI.

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png> 

As you can see most of the data is skewing to just 2 executors, with 1
getting more than half the Blocks. These become a hotspot and eventually I
start seeing OOM errors. I've tried this a half a dozen times and the 'hot'
executors changes, but not the skewing behavior.

Any idea what is going on here?

Thanks,

Mike
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to