Hi Aljoscha, Yes. I am running the application till a few checkpoints are complete. I am stopping the application between two checkpoints, so there will be messages in the list state, which should be checkpointed when *snapshot* is called. I am able to see a checkpoint file on S3( I am saving the checkpoints on s3 using rockstatedb). On restarting the application, I add a debug point here <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L49>, to see if there are any messages in checkpointedMessages, but as shown below, the list is empty.
Do you think there might be an error in the way I am trying to retrieve messages? def snapshotState(context: FunctionSnapshotContext) { checkpointedMessages.clear() bufferredMessages.foreach(checkpointedMessages.add) pendingFiles synchronized { if (pendingFiles.nonEmpty) { // we have a list of pending files // we move all times to S3( thats the sink in our case) // and post that we delete these files } pendingFiles.clear() } } * def initializeState(context: FunctionInitializationContext) {* * // Check is files alreay exist in /tmp* * // this might be the case the program crashed before these files were uploaded to s3* * // We need to recover(upload these files to S3 and clear the directory* * handlePreviousPendingFiles()* * checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {})))* * import scala.collection.JavaConversions._* * for (message <- checkpointedMessages.get) {* * bufferredMessages.add(message)* * }* * }* From my understanding in the above code, upon checkpointing, messages contained in checkpointedMessages are in the snapshot, and on *initializeState* being called, it will try to recover these messages from last checkpoint? Do you think the error is in the way I am trying to get the last checkpoint ListBuffer elements? checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {}))) Please let me know! Thanks, Vipul On Mon, Aug 28, 2017 at 2:52 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > How are you testing the recovery behaviour? Are you taking a savepoint > ,then shutting down, and then restarting the Job from the savepoint? > > Best, > Aljoscha > > On 28. Aug 2017, at 00:28, vipul singh <neoea...@gmail.com> wrote: > > Hi all, > > I am working on a flink archiver application. In a gist this application > tries to reads a bunch of schematized messages from kafka and archives them > to s3. Due to the nature of the naming of the files, I had to go towards a > custom sink implementation. As of the current progress the application in > general is able to archive files to s3 ok. > I am having some issues during the recovery phase. A sample of the code > can be found on link > <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe>. My > issue is on recovery when initializeState is called, it is not able to > get(recover) the last checkpointed ListState( i.e. checkpointedMessages > <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L47> > is > 0). I think this might be because of the way I am retrieving the > checkpointed messages. Could someone please point me to what is wrong? or > direct me to some examples which do a similar thing( Please note Message > <https://gist.github.com/neoeahit/4c80af355d5166eab07fbae1079060fe#file-gistfile1-txt-L2> > class > is our own implementation) > > Thanks, > Vipul > > > -- Thanks, Vipul