Ohh that explains the reason.
My use case does not need state management.
So i guess i am better off without checkpointing.
Thank you for clarification.

Regards,
Chandan

On Sat, Aug 20, 2016 at 8:24 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Checkpointing is required to be turned on in certain situations (e.g.
> updateStateByKey), but you're certainly not required to rely on it for
> fault tolerance.  I try not to.
>
> On Fri, Aug 19, 2016 at 1:51 PM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Thanks Cody for the pointer.
>>
>> I am able to do this now. Not using checkpointing. Rather storing offsets
>> in zookeeper for fault tolerance.
>> Spark Config changes now getting reflected in code deployment.
>> *Using this api :*
>> *KafkaUtils.createDirectStream[String, String, StringDecoder,
>> StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
>> messageHandler)*
>> *instead of :*
>> *KafkaUtils.createDirectStream[String, String, StringDecoder,
>> StringDecoder](ssc, kafkaParams, topicsSet)*
>>
>> *One Quick question :
>> *What is need of checkpointing if we can achieve both fault tolerance and 
>> application code/config changes  without checkpointing? Is there anything 
>> else which checkpointing gives? I might be missing something.
>>
>>
>> Regards,
>> Chandan
>>
>>
>> On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Yeah the solutions are outlined in the doc link.  Or just don't rely on
>>> checkpoints
>>> On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaran...@gmail.com>
>>> wrote:
>>>
>>>> Yes,
>>>>  i looked into the source code implementation.  sparkConf is serialized
>>>> and saved during checkpointing and re-created from the checkpoint directory
>>>> at time of restart. So any sparkConf parameter which you load from
>>>> application.config and set in sparkConf object in code cannot be changed
>>>> and reflected with checkpointing.  :(
>>>>
>>>> Is there is any work around of reading changed sparkConf parameter
>>>> value with using checkpoiting?
>>>> p.s. i am not adding new parameter, i am just changing values of some
>>>> existing sparkConf param.
>>>>
>>>> This is a common case and there must be some solution for this.
>>>>
>>>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Checkpointing is not kafka-specific.  It encompasses metadata about
>>>>> the application.  You can't re-use a checkpoint if your application has
>>>>> changed.
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-programming-gu
>>>>> ide.html#upgrading-application-code
>>>>>
>>>>>
>>>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
>>>>> chandanbaran...@gmail.com> wrote:
>>>>>
>>>>>> Is it possible that i use checkpoint directory to restart streaming
>>>>>> but with modified parameter value in config file (e.g.  username/password
>>>>>> for db connection)  ?
>>>>>> Thanks in advance.
>>>>>>
>>>>>> Regards,
>>>>>> Chandan
>>>>>>
>>>>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>>>>>> chandanbaran...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I am using direct kafka with checkpointing of offsets same as :
>>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>>>>>> src/main/scala/example/IdempotentExample.scala
>>>>>>>
>>>>>>> I need to change some parameters like db connection params :
>>>>>>> username/password for db connection .
>>>>>>> I stopped streaming gracefully ,changed parameters in config file
>>>>>>> and restarted streaming.
>>>>>>> *Issue : changed parameters  username/password are not being
>>>>>>> considered.*
>>>>>>>
>>>>>>> *Question* :
>>>>>>> As per my understanding , Checkpointing should only save offsets of
>>>>>>> kafka partitions and not the credentials of the db connection.
>>>>>>> Why its picking old db connection params ?
>>>>>>>
>>>>>>> I am declaring params in main method and not in setUpSsc(0 method.
>>>>>>> My code is identical to that in the above program link  as below:
>>>>>>> val jdbcDriver = conf.getString("jdbc.driver")
>>>>>>> val jdbcUrl = conf.getString("jdbc.url")
>>>>>>> *val jdbcUser = conf.getString("jdbc.user")*
>>>>>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>>>>>> // while the job doesn't strictly need checkpointing,
>>>>>>> // we'll checkpoint to avoid replaying the whole kafka log in case
>>>>>>> of failure
>>>>>>> val checkpointDir = conf.getString("checkpointDir")
>>>>>>> val ssc = StreamingContext.getOrCreate(
>>>>>>> checkpointDir,
>>>>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>>>>>> *jdbcPassword*, checkpointDir) _
>>>>>>> )
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Chandan Prakash
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Chandan Prakash
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Chandan Prakash
>>>>
>>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>


-- 
Chandan Prakash

Reply via email to