Hi Yun, First of all sorry for the naming mistake , it was a typo How to judge that they are related to specific checkpoint? I judged by removing the files and restarting the job - seeing if it fails
in the code below I missed the privateState that is why I was missing files What about recovery folders how can I know when and what files can I removed? I see the folder contains recovery -<job-name> and list of completedCheckpoint3bc1020bb0f9 Best, Shachar On 2020/04/20 12:14:35, Yun Tang <myas...@live.com> wrote: > Hi Shachar > > You can refer to [1] to know the directory structure. The files (usually > ByteStreamStateHandle) which are not in the shared folder are exclusive state > like operator state or exclusive files uploaded during each incremental > checkpoint. And actually I don't understand why you would say some files are > not mentioned in the metadata file but are related to the checkpoint? How to > judge that they are related to specific checkpoint? > > BTW, my name is "Yun" which means cloud in Chinese, not the delicious "Yum" 🙂 > > Best > Yun Tang > ________________________________ > From: Shachar Carmeli <carmeli....@gmail.com> > Sent: Monday, April 20, 2020 15:36 > To: user@flink.apache.org <user@flink.apache.org> > Subject: Re: Flink incremental checkpointing - how long does data is kept in > the share folder > > Hi Yum > I noticed that the some files are related to the checkpoint but are not > mentioned in the metadata file > and some of the files that are related in the metadata file (usually > ByteStreamStateHandle ) are not in the share file > can you explain this behaviour ? > > see code I was using > final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, > CheckpointTool.class.getClassLoader()); > > final Set<String> pathSharedFromMetadata = > savepoint.getOperatorStates().stream() > .flatMap(operatorState -> > operatorState.getSubtaskStates().values().stream() > > .flatMap(operatorSubtaskState -> > operatorSubtaskState.getManagedKeyedState().stream() > > .flatMap(keyedStateHandle -> ((IncrementalKeyedStateHandle) > keyedStateHandle).getSharedState().values().stream() > > .map(streamStateHandle -> { > > totalSize[0] += streamStateHandle.getStateSize(); > > String name = null; > > if (streamStateHandle instanceof FileStateHandle) { > > name = ((FileStateHandle) > streamStateHandle).getFilePath().getName(); > > } else { > > final String handleName = ((ByteStreamStateHandle) > streamStateHandle).getHandleName(); > > name = new File(handleName).getName(); > > } > > return name; > > > })))) > .collect(Collectors.toSet()); > > Thanks in advance > Shachar > > On 2020/04/13 14:30:40, Yun Tang <myas...@live.com> wrote: > > Hi Shachar > > > > I think you could refer to [1] to know the directory structure of > > checkpoints. The '_metadata' file contains all information of which > > checkpointed data file belongs, e.g. file paths under 'shared' folder. As I > > said before, you need to call Checkpoints#loadCheckpointMetadata to load > > '_metadata' to know which files belonging to that checkpoint. > > > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure > > > > Best > > Yun Tang > > > > ________________________________ > > From: Shachar Carmeli <carmeli....@gmail.com> > > Sent: Sunday, April 12, 2020 15:32 > > To: user@flink.apache.org <user@flink.apache.org> > > Subject: Re: Flink incremental checkpointing - how long does data is kept > > in the share folder > > > > Thank you for the quick response > > Your answer related to the checkpoint folder that contains the _metadata > > file e.g. chk-1829 > > What about the "shared" folder , how do I know which files in that folder > > are still relevant and which are left over from a failed checkpoint , they > > are not directly related to the _metadata checkpoint or am I missing > > something? > > > > > > On 2020/04/07 18:37:57, Yun Tang <myas...@live.com> wrote: > > > Hi Shachar > > > > > > Why do we see data that is older from lateness configuration > > > There might existed three reasons: > > > > > > 1. RocksDB really still need that file in current checkpoint. If we > > > upload one file named as 42.sst at 2/4 at some old checkpoint, current > > > checkpoint could still include that 42.sst file again if that file is > > > never be compacted since then. This is possible in theory. > > > 2. Your checkpoint size is large and checkpoint coordinator could not > > > remove as fast as possible before exit. > > > 3. That file is created by a crash task manager and not known to > > > checkpoint coordinator. > > > > > > How do I know that the files belong to a valid checkpoint and not a > > > checkpoint of a crushed job - so we can delete those files > > > You have to call Checkpoints#loadCheckpointMetadata[1] to load latest > > > _metadata in checkpoint directory and compare the file paths with current > > > files in checkpoint directory. The ones are not in the checkpoint meta > > > and older than latest checkpoint could be removed. You could follow this > > > to debug or maybe I could write a tool to help know what files could be > > > deleted later. > > > > > > [1] > > > https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96 > > > > > > Best > > > Yun Tang > > > > > > ________________________________ > > > From: Shachar Carmeli <carmeli....@gmail.com> > > > Sent: Tuesday, April 7, 2020 16:19 > > > To: user@flink.apache.org <user@flink.apache.org> > > > Subject: Flink incremental checkpointing - how long does data is kept in > > > the share folder > > > > > > We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining > > > only one checkpoint at a time , using incremental and using rocksdb. > > > > > > We run windows with lateness of 3 days , which means that we expect that > > > no data in the checkpoint share folder will be kept after 3-4 days ,Still > > > We see that there is data from more than that > > > e.g. > > > If today is 7/4 there are some files from the 2/4 > > > > > > Sometime we see checkpoints that we assume (due to the fact that its > > > index number is not in synch) that it belongs to a job that crushed and > > > the checkpoint was not used to restore the job > > > > > > My questions are > > > > > > Why do we see data that is older from lateness configuration > > > How do I know that the files belong to a valid checkpoint and not a > > > checkpoint of a crushed job - so we can delete those files > > > > > >