Hi, If you are not manually doing a savepoint and then restoring from that savepoint you will not restore state. Simply stopping a job and then restarting will not restore state. The regular checkpoints are only used for recovery if a job fails, not for a user-induced shutdown.
Best, Aljoscha > On 28. Aug 2017, at 20:14, vipul singh <neoea...@gmail.com> wrote: > > Hi Aljoscha, > > Yes. > I am running the application till a few checkpoints are complete. I am > stopping the application between two checkpoints, so there will be messages > in the list state, which should be checkpointed when snapshot is called. I am > able to see a checkpoint file on S3( I am saving the checkpoints on s3 using > rockstatedb). On restarting the application, I add a debug point here > <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L49>, > to see if there are any messages in checkpointedMessages, but as shown > below, the list is empty. > > <Screen Shot 2017-08-28 at 10.30.10 AM.png> > > Do you think there might be an error in the way I am trying to retrieve > messages? > > > def snapshotState(context: FunctionSnapshotContext) { > checkpointedMessages.clear() > bufferredMessages.foreach(checkpointedMessages.add) > > pendingFiles synchronized { > if (pendingFiles.nonEmpty) { > // we have a list of pending files > // we move all times to S3( thats the sink in our case) > // and post that we delete these files > } > pendingFiles.clear() > } > } > > def initializeState(context: FunctionInitializationContext) { > > // Check is files alreay exist in /tmp > // this might be the case the program crashed before these files were > uploaded to s3 > // We need to recover(upload these files to S3 and clear the directory > handlePreviousPendingFiles() > > checkpointedMessages = context.getOperatorStateStore.getListState(new > ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new > TypeHint[Message]() {}))) > import scala.collection.JavaConversions._ > for (message <- checkpointedMessages.get) { > bufferredMessages.add(message) > } > } > > From my understanding in the above code, upon checkpointing, messages > contained in checkpointedMessages are in the snapshot, and on initializeState > being called, it will try to recover these messages from last checkpoint? > Do you think the error is in the way I am trying to get the last checkpoint > ListBuffer elements? > checkpointedMessages = context.getOperatorStateStore.getListState(new > ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new > TypeHint[Message]() {}))) > > Please let me know! > > Thanks, > Vipul > > On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi, > > How are you testing the recovery behaviour? Are you taking a savepoint ,then > shutting down, and then restarting the Job from the savepoint? > > Best, > Aljoscha > >> On 28. Aug 2017, at 00:28, vipul singh <neoea...@gmail.com >> <mailto:neoea...@gmail.com>> wrote: >> >> Hi all, >> >> I am working on a flink archiver application. In a gist this application >> tries to reads a bunch of schematized messages from kafka and archives them >> to s3. Due to the nature of the naming of the files, I had to go towards a >> custom sink implementation. As of the current progress the application in >> general is able to archive files to s3 ok. >> I am having some issues during the recovery phase. A sample of the code can >> be found on link >> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe>. My >> issue is on recovery when initializeState is called, it is not able to >> get(recover) the last checkpointed ListState( i.e. checkpointedMessages >> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L47> >> is 0). I think this might be because of the way I am retrieving the >> checkpointed messages. Could someone please point me to what is wrong? or >> direct me to some examples which do a similar thing( Please note Message >> <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L2> >> class is our own implementation) >> >> Thanks, >> Vipul > > > > > -- > Thanks, > Vipul