Checkpointing is required to be turned on in certain situations (e.g. updateStateByKey), but you're certainly not required to rely on it for fault tolerance. I try not to.
On Fri, Aug 19, 2016 at 1:51 PM, chandan prakash <chandanbaran...@gmail.com> wrote: > Thanks Cody for the pointer. > > I am able to do this now. Not using checkpointing. Rather storing offsets > in zookeeper for fault tolerance. > Spark Config changes now getting reflected in code deployment. > *Using this api :* > *KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, > messageHandler)* > *instead of :* > *KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topicsSet)* > > *One Quick question : > *What is need of checkpointing if we can achieve both fault tolerance and > application code/config changes without checkpointing? Is there anything > else which checkpointing gives? I might be missing something. > > > Regards, > Chandan > > > On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Yeah the solutions are outlined in the doc link. Or just don't rely on >> checkpoints >> On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaran...@gmail.com> >> wrote: >> >>> Yes, >>> i looked into the source code implementation. sparkConf is serialized >>> and saved during checkpointing and re-created from the checkpoint directory >>> at time of restart. So any sparkConf parameter which you load from >>> application.config and set in sparkConf object in code cannot be changed >>> and reflected with checkpointing. :( >>> >>> Is there is any work around of reading changed sparkConf parameter value >>> with using checkpoiting? >>> p.s. i am not adding new parameter, i am just changing values of some >>> existing sparkConf param. >>> >>> This is a common case and there must be some solution for this. >>> >>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Checkpointing is not kafka-specific. It encompasses metadata about the >>>> application. You can't re-use a checkpoint if your application has >>>> changed. >>>> >>>> http://spark.apache.org/docs/latest/streaming-programming-gu >>>> ide.html#upgrading-application-code >>>> >>>> >>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash < >>>> chandanbaran...@gmail.com> wrote: >>>> >>>>> Is it possible that i use checkpoint directory to restart streaming >>>>> but with modified parameter value in config file (e.g. username/password >>>>> for db connection) ? >>>>> Thanks in advance. >>>>> >>>>> Regards, >>>>> Chandan >>>>> >>>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash < >>>>> chandanbaran...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> I am using direct kafka with checkpointing of offsets same as : >>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/ >>>>>> src/main/scala/example/IdempotentExample.scala >>>>>> >>>>>> I need to change some parameters like db connection params : >>>>>> username/password for db connection . >>>>>> I stopped streaming gracefully ,changed parameters in config file and >>>>>> restarted streaming. >>>>>> *Issue : changed parameters username/password are not being >>>>>> considered.* >>>>>> >>>>>> *Question* : >>>>>> As per my understanding , Checkpointing should only save offsets of >>>>>> kafka partitions and not the credentials of the db connection. >>>>>> Why its picking old db connection params ? >>>>>> >>>>>> I am declaring params in main method and not in setUpSsc(0 method. >>>>>> My code is identical to that in the above program link as below: >>>>>> val jdbcDriver = conf.getString("jdbc.driver") >>>>>> val jdbcUrl = conf.getString("jdbc.url") >>>>>> *val jdbcUser = conf.getString("jdbc.user")* >>>>>> * val jdbcPassword = conf.getString("jdbc.password")* >>>>>> // while the job doesn't strictly need checkpointing, >>>>>> // we'll checkpoint to avoid replaying the whole kafka log in case of >>>>>> failure >>>>>> val checkpointDir = conf.getString("checkpointDir") >>>>>> val ssc = StreamingContext.getOrCreate( >>>>>> checkpointDir, >>>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*, >>>>>> *jdbcPassword*, checkpointDir) _ >>>>>> ) >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Chandan Prakash >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Chandan Prakash >>>>> >>>>> >>>> >>> >>> >>> -- >>> Chandan Prakash >>> >>> > > > -- > Chandan Prakash > >