Checkpointing is required to be turned on in certain situations (e.g.
updateStateByKey), but you're certainly not required to rely on it for
fault tolerance.  I try not to.

On Fri, Aug 19, 2016 at 1:51 PM, chandan prakash <chandanbaran...@gmail.com>
wrote:

> Thanks Cody for the pointer.
>
> I am able to do this now. Not using checkpointing. Rather storing offsets
> in zookeeper for fault tolerance.
> Spark Config changes now getting reflected in code deployment.
> *Using this api :*
> *KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
> messageHandler)*
> *instead of :*
> *KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)*
>
> *One Quick question :
> *What is need of checkpointing if we can achieve both fault tolerance and 
> application code/config changes  without checkpointing? Is there anything 
> else which checkpointing gives? I might be missing something.
>
>
> Regards,
> Chandan
>
>
> On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Yeah the solutions are outlined in the doc link.  Or just don't rely on
>> checkpoints
>> On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaran...@gmail.com>
>> wrote:
>>
>>> Yes,
>>>  i looked into the source code implementation.  sparkConf is serialized
>>> and saved during checkpointing and re-created from the checkpoint directory
>>> at time of restart. So any sparkConf parameter which you load from
>>> application.config and set in sparkConf object in code cannot be changed
>>> and reflected with checkpointing.  :(
>>>
>>> Is there is any work around of reading changed sparkConf parameter value
>>> with using checkpoiting?
>>> p.s. i am not adding new parameter, i am just changing values of some
>>> existing sparkConf param.
>>>
>>> This is a common case and there must be some solution for this.
>>>
>>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Checkpointing is not kafka-specific.  It encompasses metadata about the
>>>> application.  You can't re-use a checkpoint if your application has 
>>>> changed.
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-programming-gu
>>>> ide.html#upgrading-application-code
>>>>
>>>>
>>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
>>>> chandanbaran...@gmail.com> wrote:
>>>>
>>>>> Is it possible that i use checkpoint directory to restart streaming
>>>>> but with modified parameter value in config file (e.g.  username/password
>>>>> for db connection)  ?
>>>>> Thanks in advance.
>>>>>
>>>>> Regards,
>>>>> Chandan
>>>>>
>>>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>>>>> chandanbaran...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am using direct kafka with checkpointing of offsets same as :
>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>>>>> src/main/scala/example/IdempotentExample.scala
>>>>>>
>>>>>> I need to change some parameters like db connection params :
>>>>>> username/password for db connection .
>>>>>> I stopped streaming gracefully ,changed parameters in config file and
>>>>>> restarted streaming.
>>>>>> *Issue : changed parameters  username/password are not being
>>>>>> considered.*
>>>>>>
>>>>>> *Question* :
>>>>>> As per my understanding , Checkpointing should only save offsets of
>>>>>> kafka partitions and not the credentials of the db connection.
>>>>>> Why its picking old db connection params ?
>>>>>>
>>>>>> I am declaring params in main method and not in setUpSsc(0 method.
>>>>>> My code is identical to that in the above program link  as below:
>>>>>> val jdbcDriver = conf.getString("jdbc.driver")
>>>>>> val jdbcUrl = conf.getString("jdbc.url")
>>>>>> *val jdbcUser = conf.getString("jdbc.user")*
>>>>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>>>>> // while the job doesn't strictly need checkpointing,
>>>>>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>>>>>> failure
>>>>>> val checkpointDir = conf.getString("checkpointDir")
>>>>>> val ssc = StreamingContext.getOrCreate(
>>>>>> checkpointDir,
>>>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>>>>> *jdbcPassword*, checkpointDir) _
>>>>>> )
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Chandan Prakash
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Chandan Prakash
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Chandan Prakash
>>>
>>>
>
>
> --
> Chandan Prakash
>
>

Reply via email to