[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347136#comment-16347136 ]
ASF GitHub Bot commented on FLINK-5820: --------------------------------------- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5396 [FLINK-5820] [state backends] Split shared/exclusive state and properly handle disposal ## What is the purpose of the change This PR contains the final changes needed for [FLINK-5820]. Disposal of checkpoint directories happens properly across all file system types (previously did not work properly for some S3 connectors) with reduced calls to the file systems. Shared and exclusive state are split into different directories, to help implement cleanup safety nets. ## Brief change log 1. TaskManagers use the `CheckpointStorage` to create `CheckpointStreamFactories`. Previously, these stream factories were created by the `StateBackend`. This completes the separating out the "storage" aspect of the `StateBackend` into the `CheckpointStorage`. 2. The location where to store state is communicated between the `CheckpointCoordinator` (instantiating the original `CheckpointStorageLocation` for a checkpoint/savepoint) and the Tasks in a unified manner. Tasks transparently obtain their `CheckpointStreamFactories` always in the same way, regardless of whether writing state for checkpoints or savepoints. 3. Checkpoint state now has the scope `EXCLUSIVE` or `SHARED`, which may be stored differently. The current file system based backends put shared state into a */shared* directory, while exclusive state goes into the */chk-1234* directory. 4. Tasks can directly write *task-owned state* to a checkpoint storage. That state neither belongs specifically to one checkpoint, nor is it shared and eventually released by the Checkpoint Coordinator. Only the tasks themselves may release the state. An example for that type of state are the *write ahead logs* created by some sinks. 5. When a checkpoint is finalized, its storage is described by a `CompletedCheckpointStorageLocation`. That object gives access to addressing, metadata, and handles location disposal. This allows us to drop the *"delete parent if empty"* logic in File State Handles and fixes the issue that checkpoint directories are currently left over on S3. **Future Work** - In the future, the `CompletedCheckpointStorageLocation` should also be used as a way to handle relative addressing of checkpoints, to allow users to move them to different directories without breaking the internal paths. - We can now implement disposal fast paths, like drop directory as a whole, rather than dropping each state object separately. However, one would still need to release drop shared state objects individually. Finishing these fast paths is currently blocked on some rework of the shared state handles, to make their selective release easier and more robust. ## Verifying this change This change can be verified by running a Flink cluster with a checkpointed program and This PR also adds and adjusts various unit tests to guard the new behavior. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? *Somewhat* (it changes the state backend directory layouts) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink locations Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5396.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5396 ---- commit ec8e552a7b50b8c605bb2609713cb2dd50245118 Author: Stephan Ewen <sewen@...> Date: 2018-01-30T14:53:46Z [hotfix] [checkpointing] Cleanup: Fix Nullability and names in checkpoint stats. commit cf18831b69bd909d6491eb73d3294d3295ddd930 Author: Stephan Ewen <sewen@...> Date: 2018-01-30T10:57:30Z [hotfix] [tests] Drop obsolete CheckpointExternalResumeTest. Because all checkpoints are now externalized (write their metadata) this is an obsolete test. commit a46acdd0f7142e40eee8c742e17eefaa6c7da3da Author: Stephan Ewen <sewen@...> Date: 2018-01-29T22:24:24Z [hotfix] [checkpoints] Clean up CompletedCheckpoint, grouping related methods together commit f3eb9511718fe8526a6602b547ce6b4f1fd9e2fa Author: Stephan Ewen <sewen@...> Date: 2018-01-29T22:54:57Z [hotfix] [checkpoints] Drop ill-defined hashCode() and equals() from CompletedCheckpoint. commit 8d198c7eb7c0f23f179f45b1cdc8b862027076b7 Author: Stephan Ewen <sewen@...> Date: 2018-01-26T09:56:39Z [hotfix] [tests] Clean up HeapKeyedStateBackendAsyncByDefaultTest commit 59c917eb5dc5329a233c760367ffbef2ffe98beb Author: Stephan Ewen <sewen@...> Date: 2018-01-10T16:16:03Z [FLINK-8531] [checkpoints] (part 1) Pull CheckpointType into its own class. commit e9ed622588190008cc9bfaa03578a55bdba4c09d Author: Stephan Ewen <sewen@...> Date: 2018-01-19T14:18:57Z [FLINK-8531] [checkpoints] (part 2) Add CheckpointType to CheckpointProperties commit f38eb542684365837474303a10d49eb50eabb378 Author: Stephan Ewen <sewen@...> Date: 2018-01-10T17:02:27Z [FLINK-8531] [checkpoints] (part 3) Rework ExternalizedCheckpointITCase commit 2551f0df0ed0b89d25f4442f101fa453f3ae697e Author: Stephan Ewen <sewen@...> Date: 2018-01-10T17:13:50Z [FLINK-8531] [checkpoints] (part 4) rename forCheckpoint() to forCheckpointWithDefaultLocation() commit 09dab449f665e5b2eea08912b7d3ecfa32449e53 Author: Stephan Ewen <sewen@...> Date: 2018-01-19T12:37:08Z [FLINK-8531] [checkpoints] (part 5) Introduce CheckpointStorageLocationReference instead of String to communicate the location commit 828873847ca710a562f96a86c1caf058347a3406 Author: Stephan Ewen <sewen@...> Date: 2018-01-10T16:14:06Z [FLINK-8531] [checkpoints] (part 6) Tasks resolve CheckpointStreamFactory from CheckpointStorage and Checkpoint Location Reference to persist checkpoint data. commit 950e7fd2987cd12d14829be573c3f8c54056e965 Author: Stephan Ewen <sewen@...> Date: 2018-01-26T10:20:59Z [FLINK-8531] [checkpoints] (part 7) Move tests specific to Checkpoint Storage and Checkpoint Stream to separate tests suites commit 19d6d1603841d321e5c509c8d054d6056f8ff243 Author: Stephan Ewen <sewen@...> Date: 2018-01-26T11:06:44Z [FLINK-8531] [checkpoints] (part 8) Add tests for the FsCheckpointStorage and MemoryBackendCheckpointStorage. commit 62f635dbec3ee4d7b151e8ae0887a4cf84427752 Author: Stephan Ewen <sewen@...> Date: 2018-01-26T17:37:42Z [FLINK-8531] [checkpoints] (part 9) Introduce EXCLUSIVE and SHARED scope for states commit 3c5440d3dd2f0fe3a0ef6c2855cefef0fc028ed4 Author: Stephan Ewen <sewen@...> Date: 2018-01-26T09:42:34Z [hotfix] [runtime] Fix checkstyle for 'runtime/io/network/api'. commit 4f2770f4a9cd880218a832676deb2f33a690284c Author: Stephan Ewen <sewen@...> Date: 2018-01-31T15:54:58Z [FLINK-8539] [checkpointing] (part 1) Introduce CompletedCheckpointStorageLocation to properly handle disposal of checkpoints. That concept allows us to properly handle deletion of a checkpoint storage, for example deleting checkpoint directories, or the dropping of a checkpoint specific table. This replaces the current workaround for file systems, where every file disposal checks if the parent directory is now empty, and deletes it if that is the case. That is not only inefficient, but prohibitively expensive on some systems, like Amazon S3. commit 99ca08650b1de215e0079eed04d513fecd445299 Author: Stephan Ewen <sewen@...> Date: 2018-01-31T15:57:45Z [FLINK-8539] [checkpointing] (part 2) Modify all tests to use CompletedCheckpointStorageLocation. commit cf26d3875eb6db3818c73dc1f42cc4ecaad98103 Author: Stephan Ewen <sewen@...> Date: 2018-01-31T16:01:33Z [FLINK-8539] [checkpointing] (part 3) Rename FixFileFsStateOutputStream to FsCheckpointMetadataOutputStream The new name captures the proper use and meaning of the class in a better way. commit 51691e0d1ff5b334b67b87e700c89b69657f28a5 Author: Stephan Ewen <sewen@...> Date: 2018-01-26T11:46:44Z [FLINK-8540] [checkpointing] FileStateHandles no longer attempt to clean up their parent directory. Performing directory contents checks and cleaning up the parent directory in the state handle disposal has previously led to excessive file system metadata requests, which especially on systems like Amazon S3 is prohibitively expensive. ---- > Extend State Backend Abstraction to support Global Cleanup Hooks > ---------------------------------------------------------------- > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ../<flink-checkpoints>/job1-id/ > /shared/ <-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ../<flink-checkpoints>/job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ../<flink-savepoints>/savepoint-1/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > /savepoint-2/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > {code} > This is the umbrella issue for the individual steps needed to address this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)