Hey guys,

Can someone help me to solve the current issue. My code is the following :

    var arr = new ArrayBuffer[String]()
    sa_msgs.map(x => x._1)
           .foreachRDD { rdd => arr = new ArrayBuffer[String]()
    }
    (2)
    sa_msgs.map(x => x._1)
           .foreachRDD { rdd => arr ++= rdd.collect
    }
    (3)
    val rdd_site_aids = sc.esRDD(EsIndex, 
arr.toList.mkString(prefix,separator,suffix))                          
    (4)

    sa_msgs.map(x => x._1)
           .foreachRDD { rdd =>

             rdd.map(x => (x, 0))
                .join(rdd_site_aids)
                ...
           }
This code works well with Spark but not with Spark Streaming because arr is not 
defined at the same time (3) or (4) are executed

I'd enjoy to do everything in (4) foreachRDD but sc is not serializable ... and 
esRDD needs a query that I construct from sa_msgs

Does anybody see a way to solve it ? (1) and (2) not blocking seems to be the 
real issue, but there is certainly a Spark Streaming way to solve it.

Thanks

Reply via email to