Ohh that explains the reason. My use case does not need state management. So i guess i am better off without checkpointing. Thank you for clarification.
Regards, Chandan On Sat, Aug 20, 2016 at 8:24 AM, Cody Koeninger <c...@koeninger.org> wrote: > Checkpointing is required to be turned on in certain situations (e.g. > updateStateByKey), but you're certainly not required to rely on it for > fault tolerance. I try not to. > > On Fri, Aug 19, 2016 at 1:51 PM, chandan prakash < > chandanbaran...@gmail.com> wrote: > >> Thanks Cody for the pointer. >> >> I am able to do this now. Not using checkpointing. Rather storing offsets >> in zookeeper for fault tolerance. >> Spark Config changes now getting reflected in code deployment. >> *Using this api :* >> *KafkaUtils.createDirectStream[String, String, StringDecoder, >> StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, >> messageHandler)* >> *instead of :* >> *KafkaUtils.createDirectStream[String, String, StringDecoder, >> StringDecoder](ssc, kafkaParams, topicsSet)* >> >> *One Quick question : >> *What is need of checkpointing if we can achieve both fault tolerance and >> application code/config changes without checkpointing? Is there anything >> else which checkpointing gives? I might be missing something. >> >> >> Regards, >> Chandan >> >> >> On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Yeah the solutions are outlined in the doc link. Or just don't rely on >>> checkpoints >>> On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaran...@gmail.com> >>> wrote: >>> >>>> Yes, >>>> i looked into the source code implementation. sparkConf is serialized >>>> and saved during checkpointing and re-created from the checkpoint directory >>>> at time of restart. So any sparkConf parameter which you load from >>>> application.config and set in sparkConf object in code cannot be changed >>>> and reflected with checkpointing. :( >>>> >>>> Is there is any work around of reading changed sparkConf parameter >>>> value with using checkpoiting? >>>> p.s. i am not adding new parameter, i am just changing values of some >>>> existing sparkConf param. >>>> >>>> This is a common case and there must be some solution for this. >>>> >>>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> Checkpointing is not kafka-specific. It encompasses metadata about >>>>> the application. You can't re-use a checkpoint if your application has >>>>> changed. >>>>> >>>>> http://spark.apache.org/docs/latest/streaming-programming-gu >>>>> ide.html#upgrading-application-code >>>>> >>>>> >>>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash < >>>>> chandanbaran...@gmail.com> wrote: >>>>> >>>>>> Is it possible that i use checkpoint directory to restart streaming >>>>>> but with modified parameter value in config file (e.g. username/password >>>>>> for db connection) ? >>>>>> Thanks in advance. >>>>>> >>>>>> Regards, >>>>>> Chandan >>>>>> >>>>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash < >>>>>> chandanbaran...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> I am using direct kafka with checkpointing of offsets same as : >>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/ >>>>>>> src/main/scala/example/IdempotentExample.scala >>>>>>> >>>>>>> I need to change some parameters like db connection params : >>>>>>> username/password for db connection . >>>>>>> I stopped streaming gracefully ,changed parameters in config file >>>>>>> and restarted streaming. >>>>>>> *Issue : changed parameters username/password are not being >>>>>>> considered.* >>>>>>> >>>>>>> *Question* : >>>>>>> As per my understanding , Checkpointing should only save offsets of >>>>>>> kafka partitions and not the credentials of the db connection. >>>>>>> Why its picking old db connection params ? >>>>>>> >>>>>>> I am declaring params in main method and not in setUpSsc(0 method. >>>>>>> My code is identical to that in the above program link as below: >>>>>>> val jdbcDriver = conf.getString("jdbc.driver") >>>>>>> val jdbcUrl = conf.getString("jdbc.url") >>>>>>> *val jdbcUser = conf.getString("jdbc.user")* >>>>>>> * val jdbcPassword = conf.getString("jdbc.password")* >>>>>>> // while the job doesn't strictly need checkpointing, >>>>>>> // we'll checkpoint to avoid replaying the whole kafka log in case >>>>>>> of failure >>>>>>> val checkpointDir = conf.getString("checkpointDir") >>>>>>> val ssc = StreamingContext.getOrCreate( >>>>>>> checkpointDir, >>>>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*, >>>>>>> *jdbcPassword*, checkpointDir) _ >>>>>>> ) >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Chandan Prakash >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Chandan Prakash >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Chandan Prakash >>>> >>>> >> >> >> -- >> Chandan Prakash >> >> > -- Chandan Prakash