Yeah, it makes sense that parameters that are read only during your getOrCCreate function wouldn't be re-read, since that function isn't called if a checkpoint is loaded.
I would have thought changing number of executors and other things used by spark-submit would work on checkpoint restart. Have you tried both changing them in the properties file provided to spark submit, and the --arguments that correspond to number of cores / executor memory? On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva < ricardo.pa...@corp.globo.com> wrote: > > Hi guys, > > I tried to use the configuration file, but it didn't work as I expected. > As part of the Spark Streaming flow, my methods run only when the > application is started the first time. Once I restart the app, it reads > from the checkpoint and all the dstream operations come from the cache. No > parameter is reloaded. > > I would like to know if it's possible to reset the time of windowed > operations, checkpoint time etc. I also would like to change the submission > parameters, like number of executors, memory per executor or driver etc. If > it's not possible, what kind of parameters do you guys usually use in a > configuration file. I know that the streaming interval it not possible to > be changed. > > This is my code: > > def main(args: Array[String]): Unit = { > val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER, > createSparkContext _) > ssc.start() > ssc.awaitTermination() > ssc.stop() > } > > def createSparkContext(): StreamingContext = { > val sparkConf = new SparkConf() > .setAppName(APP_NAME) > .set("spark.streaming.unpersist", "true") > val ssc = new StreamingContext(sparkConf, streamingInterval) > ssc.checkpoint(CHECKPOINT_FOLDER) > ssc.sparkContext.addFile(CONFIG_FILENAME) > > val rawStream = createKafkaRDD(ssc) > processAndSave(rawStream) > return ssc > } > > def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = { > > val configFile = SparkFiles.get("config.properties") > val config:Config = ConfigFactory.parseFile(new File(configFile)) > > > * slidingInterval = Minutes(config.getInt("streaming.sliding.interval")) > windowLength = Minutes(config.getInt("streaming.window.interval")) > minPageview = config.getInt("streaming.pageview.min")* > > > val pageviewStream = rawStream.map{ case (_, raw) => > (PageViewParser.parseURL(raw), 1L) } > val pageviewsHourlyCount = > stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum > _, > > PageViewAgregator.pageviewsMinus _, > *windowLength*, > *slidingInterval* > ) > > val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >= > *minPageview*) > permalinkAudienceStream.map(a => s"${a._1}\t${a._2}") > .repartition(1) > .saveAsTextFiles(DESTINATION_FILE, "txt") > > } > > I really appreciate any help on this. > > Many thanks, > > Ricardo > > On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva < > ricardo.pa...@corp.globo.com> wrote: > >> Good tip. I will try that. >> >> Thank you. >> >> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Yeah, in general if you're changing the jar you can't recover the >>> checkpoint. >>> >>> If you're just changing parameters, why not externalize those in a >>> configuration file so your jar doesn't change? I tend to stick even my >>> app-specific parameters in an external spark config so everything is in one >>> place. >>> >>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva < >>> ricardo.pa...@corp.globo.com> wrote: >>> >>>> Hi, >>>> >>>> Is there a way to submit an app code change, keeping the checkpoint >>>> data or do I need to erase the checkpoint folder every time I re-submit the >>>> spark app with a new jar? >>>> >>>> I have an app that count pageviews streaming from Kafka, and deliver a >>>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with >>>> the reduce and inverse functions set. >>>> >>>> I'm doing some code improvements and would like to keep the data from >>>> the past hours, so when I re-submit a code change, I would keep delivering >>>> the pageviews aggregation without need to wait for 24 hours of new data. >>>> Sometimes I'm just changing the submission parameters, like number of >>>> executors, memory and cores. >>>> >>>> Many thanks, >>>> >>>> Ricardo >>>> >>>> -- >>>> Ricardo Paiva >>>> Big Data / Semântica >>>> *globo.com* <http://www.globo.com> >>>> >>> >>> >> >> >> -- >> Ricardo Paiva >> Big Data / Semântica >> *globo.com* <http://www.globo.com> >> > > > > -- > Ricardo Paiva > Big Data / Semântica > *globo.com* <http://www.globo.com> >