Did you try inputs.repartition(1).foreachRDD(..)? Thanks Best Regards
On Fri, Jul 17, 2015 at 9:51 PM, PAULI, KEVIN CHRISTIAN [AG-Contractor/1000] <kevin.christian.pa...@monsanto.com> wrote: > Spark newbie here, using Spark 1.3.1. > > I’m consuming a stream and trying to pipe the data from the entire > window to R for analysis. The R algorithm needs the entire dataset from > the stream (everything in the window) in order to function properly; it > can’t be broken up. > > So I tried doing a coalesce(1) before calling pipe(), but it still seems > to be breaking up the data and invoking R, but it still seems to to be > breaking up the data and invoking R multiple times with small pieces of > data. Is there some other approach I should try? > > Here’s a small snippet: > > val inputs: DStream[String] = MQTTUtils.createStream(ssc, > mqttBrokerUrl, inputsTopic, StorageLevel.MEMORY_AND_DISK_SER) > .window(duration) > inputs.foreachRDD { > windowRdd => { > if (windowRdd.count() > 0) processWindow(windowRdd) > } > } > > ... > > def processWindow(windowRdd: RDD[String]) = { > // call R script to process data > windowRdd.coalesce(1) > val outputsRdd: RDD[String] = > windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString)) > outputsRdd.cache() > > if (outputsRdd.count() > 0) processOutputs(outputsRdd) > } > > ... > > > > This e-mail message may contain privileged and/or confidential information, > and is intended to be received only by persons entitled > to receive such information. If you have received this e-mail in error, > please notify the sender immediately. Please delete it and > all attachments from any servers, hard drives or any other media. Other use > of this e-mail by you is strictly prohibited. > > All e-mails and attachments sent and received are subject to monitoring, > reading and archival by Monsanto, including its > subsidiaries. The recipient of this e-mail is solely responsible for checking > for the presence of "Viruses" or other "Malware". > Monsanto, along with its subsidiaries, accepts no liability for any damage > caused by any such code transmitted by or accompanying > this e-mail or any attachment. > > > The information contained in this email may be subject to the export control > laws and regulations of the United States, potentially > including but not limited to the Export Administration Regulations (EAR) and > sanctions regulations issued by the U.S. Department of > Treasury, Office of Foreign Asset Controls (OFAC). As a recipient of this > information you are obligated to comply with all > applicable U.S. export laws and regulations. > > > >