Did you try inputs.repartition(1).foreachRDD(..)?

Thanks
Best Regards

On Fri, Jul 17, 2015 at 9:51 PM, PAULI, KEVIN CHRISTIAN
[AG-Contractor/1000] <kevin.christian.pa...@monsanto.com> wrote:

>  Spark newbie here, using Spark 1.3.1.
>
>  I’m consuming a stream and trying to pipe the data from the entire
> window to R for analysis.  The R algorithm needs the entire dataset from
> the stream (everything in the window) in order to function properly; it
> can’t be broken up.
>
>  So I tried doing a coalesce(1) before calling pipe(), but it still seems
> to be breaking up the data and invoking R, but it still seems to to be
> breaking up the data and invoking R multiple times with small pieces of
> data.  Is there some other approach I should try?
>
>  Here’s a small snippet:
>
>      val inputs: DStream[String] = MQTTUtils.createStream(ssc,
> mqttBrokerUrl, inputsTopic, StorageLevel.MEMORY_AND_DISK_SER)
>       .window(duration)
>     inputs.foreachRDD {
>       windowRdd => {
>         if (windowRdd.count() > 0) processWindow(windowRdd)
>       }
>     }
>
>  ...
>
>    def processWindow(windowRdd: RDD[String]) = {
>     // call R script to process data
>     windowRdd.coalesce(1)
>     val outputsRdd: RDD[String] =
> windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString))
>     outputsRdd.cache()
>
>      if (outputsRdd.count() > 0) processOutputs(outputsRdd)
>   }
>
>  ...
>
>
>
> This e-mail message may contain privileged and/or confidential information, 
> and is intended to be received only by persons entitled
> to receive such information. If you have received this e-mail in error, 
> please notify the sender immediately. Please delete it and
> all attachments from any servers, hard drives or any other media. Other use 
> of this e-mail by you is strictly prohibited.
>
> All e-mails and attachments sent and received are subject to monitoring, 
> reading and archival by Monsanto, including its
> subsidiaries. The recipient of this e-mail is solely responsible for checking 
> for the presence of "Viruses" or other "Malware".
> Monsanto, along with its subsidiaries, accepts no liability for any damage 
> caused by any such code transmitted by or accompanying
> this e-mail or any attachment.
>
>
> The information contained in this email may be subject to the export control 
> laws and regulations of the United States, potentially
> including but not limited to the Export Administration Regulations (EAR) and 
> sanctions regulations issued by the U.S. Department of
> Treasury, Office of Foreign Asset Controls (OFAC).  As a recipient of this 
> information you are obligated to comply with all
> applicable U.S. export laws and regulations.
>
>
>
>

Reply via email to