Yeah, it makes sense that parameters that are read only during your
getOrCCreate function wouldn't be re-read, since that function isn't called
if a checkpoint is loaded.

I would have thought changing number of executors and other things used by
spark-submit would work on checkpoint restart.  Have you tried both
changing them in the properties file provided to spark submit, and the
--arguments that correspond to number of cores / executor memory?

On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
ricardo.pa...@corp.globo.com> wrote:

>
> Hi guys,
>
> I tried to use the configuration file, but it didn't work as I expected.
> As part of the Spark Streaming flow, my methods run only when the
> application is started the first time. Once I restart the app, it reads
> from the checkpoint and all the dstream operations come from the cache. No
> parameter is reloaded.
>
> I would like to know if it's possible to reset the time of windowed
> operations, checkpoint time etc. I also would like to change the submission
> parameters, like number of executors, memory per executor or driver etc. If
> it's not possible, what kind of parameters do you guys usually use in a
> configuration file. I know that the streaming interval it not possible to
> be changed.
>
> This is my code:
>
> def main(args: Array[String]): Unit = {
>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
> createSparkContext _)
>   ssc.start()
>   ssc.awaitTermination()
>   ssc.stop()
> }
>
> def createSparkContext(): StreamingContext = {
>   val sparkConf = new SparkConf()
>      .setAppName(APP_NAME)
>      .set("spark.streaming.unpersist", "true")
>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>   ssc.checkpoint(CHECKPOINT_FOLDER)
>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>
>   val rawStream = createKafkaRDD(ssc)
>   processAndSave(rawStream)
>   return ssc
> }
>
> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>
>   val configFile = SparkFiles.get("config.properties")
>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>
>
> *  slidingInterval = Minutes(config.getInt("streaming.sliding.interval"))
> windowLength = Minutes(config.getInt("streaming.window.interval"))
> minPageview = config.getInt("streaming.pageview.min")*
>
>
>   val pageviewStream = rawStream.map{ case (_, raw) =>
> (PageViewParser.parseURL(raw), 1L) }
>   val pageviewsHourlyCount = 
> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
> _,
>
> PageViewAgregator.pageviewsMinus _,
>                                                          *windowLength*,
>                                                          *slidingInterval*
> )
>
>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
> *minPageview*)
>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>                              .repartition(1)
>                              .saveAsTextFiles(DESTINATION_FILE, "txt")
>
> }
>
> I really appreciate any help on this.
>
> Many thanks,
>
> Ricardo
>
> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>> Good tip. I will try that.
>>
>> Thank you.
>>
>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Yeah, in general if you're changing the jar you can't recover the
>>> checkpoint.
>>>
>>> If you're just changing parameters, why not externalize those in a
>>> configuration file so your jar doesn't change?  I tend to stick even my
>>> app-specific parameters in an external spark config so everything is in one
>>> place.
>>>
>>> On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is there a way to submit an app code change, keeping the checkpoint
>>>> data or do I need to erase the checkpoint folder every time I re-submit the
>>>> spark app with a new jar?
>>>>
>>>> I have an app that count pageviews streaming from Kafka, and deliver a
>>>> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
>>>> the reduce and inverse functions set.
>>>>
>>>> I'm doing some code improvements and would like to keep the data from
>>>> the past hours, so when I re-submit a code change, I would keep delivering
>>>> the pageviews aggregation without need to wait for 24 hours of new data.
>>>> Sometimes I'm just changing the submission parameters, like number of
>>>> executors, memory and cores.
>>>>
>>>> Many thanks,
>>>>
>>>> Ricardo
>>>>
>>>> --
>>>> Ricardo Paiva
>>>> Big Data / Semântica
>>>> *globo.com* <http://www.globo.com>
>>>>
>>>
>>>
>>
>>
>> --
>> Ricardo Paiva
>> Big Data / Semântica
>> *globo.com* <http://www.globo.com>
>>
>
>
>
> --
> Ricardo Paiva
> Big Data / Semântica
> *globo.com* <http://www.globo.com>
>

Reply via email to