Hi, I have an issue is reduce in streaming. I don't get a reduced stream when I use a custom object.
Here is the code snippet that I used to test this. Issue is, the reduction clearly works for a simple sequence, but what I really want is to do is send an array by adding such an array at a time to the queue and process them in parallel. For instance I send the following set of sequences DataObject(t1, Seq(0,1,2,3)) DataObject(t2, Seq(0,1,2,4)) DataObject(t3, Seq(0,1,2,5)) DataObject(t4, Seq(0,1,2,1)) . . . DataObject(t5, Seq(0,1,2,2)) Let's say the minibatch size is 1 millisecond and it get's 3 elements to that batch. DataObject(t2, Seq(0,1,2,4)) DataObject(t3, Seq(0,1,2,5)) DataObject(t4, Seq(0,1,2,1)) After reduction what I expect is DataObject(t2, Seq(0,3,6,10)) But in my code, I always get a non-reduced value. I am new to Spark streaming, but I have used Batch processing for a while now. There is an issue with my code. I need some input to find what is missing? Thank you. ------------------------------------------------------------------------------------ Code ------------------------------------------------------------------------------------ import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} import org.apache.spark.SparkConf import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class Reduce(paralelizm: Int, size: Int, iterations: Int, batchSize: Int, queueDelay: Int, logPath: String) extends Serializable { def execute(): Unit = { val conf = new SparkConf().setAppName("sparkperformance_reduce") val ssc = new StreamingContext(conf, Seconds(batchSize)) ssc.sparkContext.setLogLevel("ERROR") // do work start //var status = streamSeq(ssc, paralelizm, size, iterations, queueDelay) streamDataObject(ssc, paralelizm, size, iterations, queueDelay) //streamQ(ssc, iterations, paralelizm) // do work stop ssc.start() ssc.awaitTermination() } def streamSeq(ssc: StreamingContext, parallelizm: Int, size: Int, iterations: Int, queueDelay: Int): Boolean = { var status = false val rddQueu = new mutable.Queue[RDD[Int]]() val inputStream = ssc.queueStream(rddQueu) val result = inputStream.reduce((x, y) => x + y) result.print() for (i <- 1 to iterations) { rddQueu += ssc.sparkContext.makeRDD(1 to size * parallelizm, parallelizm) if (i == iterations) { status = true } Thread.sleep(queueDelay) } status } def streamDataObject(ssc: StreamingContext, parallelizm: Int, size: Int, iterations: Int, queueDelay: Int): Unit = { val rddQueue = new mutable.Queue[RDD[DataObject]] val inputStream = ssc.queueStream(rddQueue) val reduceResult = inputStream.reduce(update) reduceResult.foreachRDD((rdd) => rdd.foreach((p) => println(p.x + ":" + p.y))) //inputStream.count().print() reduceResult.count().print() var count = 0 for (i <- 1 to iterations) { val dataObject = new DataObject(System.currentTimeMillis(), 1 to size * parallelizm) val seq1: Seq[DataObject] = Seq[DataObject](dataObject) rddQueue += ssc.sparkContext.makeRDD(seq1) Thread.sleep(queueDelay) } } def update(a: DataObject, b: DataObject): DataObject = { a.y = Math.min(a.y, b.y) a.x = a.x.zip(b.x).map { case (x, y) => x + y } } def printDataObject(dataObject: DataObject): Unit = { println(dataObject.y + ":::" + dataObject.x) } class DataObject(yc: Long, xc: Seq[Int]) extends Serializable { var y: Long = yc var x: Seq[Int] = xc def update(dataObject: DataObject): DataObject = { this.y = Math.min(y, dataObject.y) this.x = 100 to 116 this } } } -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org