Hi,

If you are not manually doing a savepoint and then restoring from that 
savepoint you will not restore state. Simply stopping a job and then restarting 
will not restore state. The regular checkpoints are only used for recovery if a 
job fails, not for a user-induced shutdown.

Best,
Aljoscha
> On 28. Aug 2017, at 20:14, vipul singh <neoea...@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> Yes. 
> I am running the application till a few checkpoints are complete. I am 
> stopping the application between two checkpoints, so there will be messages 
> in the list state, which should be checkpointed when snapshot is called. I am 
> able to see a checkpoint file on S3( I am saving the checkpoints on s3 using 
> rockstatedb). On restarting the application, I add a debug point here 
> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L49>,
>  to see if there are any messages in checkpointedMessages, but as shown 
> below, the list is empty.
> 
> <Screen Shot 2017-08-28 at 10.30.10 AM.png>
> ​ 
> Do you think there might be an error in the way I am trying to retrieve 
> messages?
> 
> 
>   def snapshotState(context: FunctionSnapshotContext) {
>     checkpointedMessages.clear()
>     bufferredMessages.foreach(checkpointedMessages.add)
> 
>     pendingFiles synchronized {
>       if (pendingFiles.nonEmpty) {
>         // we have a list of pending files
>         // we move all times to S3( thats the sink in our case)
>         // and post that we delete these files
>       }
>       pendingFiles.clear()
>     }
>   }
> 
>   def initializeState(context: FunctionInitializationContext) {
> 
>     // Check is files alreay exist in /tmp
>     // this might be the case the program crashed before these files were 
> uploaded to s3
>     // We need to recover(upload these files to S3 and clear the directory
>     handlePreviousPendingFiles()
> 
>     checkpointedMessages = context.getOperatorStateStore.getListState(new 
> ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new 
> TypeHint[Message]() {})))
>     import scala.collection.JavaConversions._
>     for (message <- checkpointedMessages.get) {
>       bufferredMessages.add(message)
>     }
>   }
> 
> From my understanding in the above code, upon checkpointing, messages 
> contained in checkpointedMessages are in the snapshot, and on initializeState 
> being called, it will try to recover these messages from last checkpoint? 
> Do you think the error is in the way I am trying to get the last checkpoint 
> ListBuffer elements?
>     checkpointedMessages = context.getOperatorStateStore.getListState(new 
> ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new 
> TypeHint[Message]() {})))
> 
> Please let me know!
> 
> Thanks,
> Vipul
> 
> On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> How are you testing the recovery behaviour? Are you taking a savepoint ,then 
> shutting down, and then restarting the Job from the savepoint?
> 
> Best,
> Aljoscha
> 
>> On 28. Aug 2017, at 00:28, vipul singh <neoea...@gmail.com 
>> <mailto:neoea...@gmail.com>> wrote:
>> 
>> Hi all,
>> 
>> I am working on a flink archiver application. In a gist this application 
>> tries to reads a bunch of schematized messages from kafka and archives them 
>> to s3. Due to the nature of the naming of the files, I had to go towards a 
>> custom sink implementation. As of the current progress the application in 
>> general is able to archive files to s3 ok.
>> I am having some issues during the recovery phase. A sample of the code can 
>> be found on link 
>> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe>. My 
>> issue is on recovery when initializeState is called, it is not able to 
>> get(recover) the last checkpointed ListState( i.e. checkpointedMessages 
>> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L47>
>>  is 0). I think this might be because of the way I am retrieving the 
>> checkpointed messages. Could someone please point me to what is wrong? or 
>> direct me to some examples which do a similar thing( Please note Message 
>> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L2>
>>  class is our own implementation)
>> 
>> Thanks,
>> Vipul
> 
> 
> 
> 
> -- 
> Thanks,
> Vipul

Reply via email to