[ https://issues.apache.org/jira/browse/FLINK-36421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17888114#comment-17888114 ]
Zakelly Lan commented on FLINK-36421: ------------------------------------- [~planet9] It is the java part of flink that uploads all the sst files, since the rocksdb lib couldn't write the remote storage. And when recovery the flink downloads all the files to local and rebuild the rocksdb. So for rocksdb it always manipulates the real files at local. We are working on a new k-v store based on rocksdb called forst (https://issues.apache.org/jira/browse/FLINK-34975), which could read and write the remote storage. So in future the store itself can do checkpoint and recovery. I think it would be better if you backport this to release-1.18/19/20 . > Missing fsync in FsCheckpointStreamFactory > ------------------------------------------ > > Key: FLINK-36421 > URL: https://issues.apache.org/jira/browse/FLINK-36421 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Checkpointing > Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0 > Reporter: Marc Aurel Fritz > Priority: Critical > Labels: pull-request-available > Attachments: fsync-fs-stream-factory.diff > > > With Flink 1.20 we observed another checkpoint corruption bug. This is > similar to FLINK-35217, but affects only files written by the taskmanager > (the ones with random names as described > [here|https://github.com/Planet-X/flink/blob/0d6e25a9738d9d4ee94de94e1437f92611b50758/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java#L47]). > After system crash the files written by the taskmanager may be corrupted > (file size of 0 bytes) if the changes in the file-system cache haven't been > written to disk. The "_metadata" file written by the jobmanager is always > fine because it's properly fsynced. > Investigation revealed that "fsync" is missing, this time in > "FsCheckpointStreamFactory". In this case the "OutputStream" is closed > without calling "fsync", thus the file is not durably persisted on disk > before the checkpoint is completed. (As previously established in > FLINK-35217, calling "fsync" is necessary as simply closing the stream does > not have any guarantees on persistence.) > "strace" on the taskmanager's process confirms this behavior: > # The checkpoint chk-1217's directory is created at "mkdir" > # The checkpoint chk-1217's non-inline state is written by the taskmanager > at "openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that > there's no "fsync" before "close". > # The checkpoint chk-1217 is finished, its "_metadata" is written and synced > properly > # The old checkpoint chk-1216 is deleted at "unlink" > The new checkpoint chk-1217 now references a not-synced file that can get > corrupted on e.g. power loss. This means there is no working checkpoint left > as the old checkpoint was deleted. > For durable persistence an "fsync" call is missing before "close" in step 2. > Full "strace" log: > {code:java} > [pid 947250] 08:22:58 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > 0x7f68414c5b50) = -1 ENOENT (No such file or directory) > [pid 947250] 08:22:58 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > 0x7f68414c5b50) = -1 ENOENT (No such file or directory) > [pid 947250] 08:22:58 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947250] 08:22:58 > mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > 0777) = 0 > [pid 1303248] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc", > 0x7f56f08d5610) = -1 ENOENT (No such file or directory) > [pid 1303248] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 1303248] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc", > O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199 > [pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 > [pid 1303248] 08:22:59 close(199) = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata", > 0x7f683fb378b0) = -1 ENOENT (No such file or directory) > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata", > 0x7f683fb37730) = -1 ENOENT (No such file or directory) > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > 0x7f683fb37730) = -1 ENOENT (No such file or directory) > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947310] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 148 > [pid 947310] 08:22:59 fsync(148) = 0 > [pid 947310] 08:22:59 close(148) = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > {st_mode=S_IFREG|0644, st_size=46265, ...}) = 0 > [pid 947310] 08:22:59 > rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata") > = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe", > <unfinished ...> > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata", > <unfinished ...> > [pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409, > ...}) = 0 > [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265, > ...}) = 0 > [pid 947310] 08:22:59 > unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe" > <unfinished ...> > [pid 947250] 08:22:59 > unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata" > <unfinished ...> > [pid 947310] 08:22:59 <... unlink resumed>) = 0 > [pid 947250] 08:22:59 <... unlink resumed>) = 0 > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947250] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148 > [pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755, > st_size=4096, ...}, AT_EMPTY_PATH) = 0 > [pid 947250] 08:22:59 close(148) = 0 > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > <unfinished ...> > [pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such > file or directory) > [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096, > ...}) = 0 > [pid 947250] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...> > [pid 947250] 08:22:59 <... openat resumed>) = 148 > [pid 947250] 08:22:59 newfstatat(148, "", <unfinished ...> > [pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755, > st_size=4096, ...}, AT_EMPTY_PATH) = 0 > [pid 947250] 08:22:59 close(148) = 0 > [pid 947250] 08:22:59 > unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = > -1 EISDIR (Is a directory) > [pid 947250] 08:22:59 > rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0 > {code} > Calling "sync()" when closing the stream in > "FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing > the serialized state files before returning their reference to the > jobmanager. The following commit fixes this: > [https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758] > Diff is also attached. > "strace" confirms that "fsync" is now called before the taskmanager's state > file is closed, see line 9: > {code:java} > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > 0x7f2c167fc890) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > 0x7f2c167fc890) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc", > {st_mode=S_IFDIR|0755, st_size=44, ...}) = 0 > [pid 108807] 13:14:59 > mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777) > = 0 > [pid 110456] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222", > 0x7f7d56efbe90) = -1 ENOENT (No such file or directory) > [pid 110456] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 110456] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222", > O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268 > [pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 > [pid 110456] 13:14:59 fsync(268) = 0 > [pid 110456] 13:14:59 close(268) = 0 > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata", > 0x7f2c167fc710) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata", > <unfinished ...> > [pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such > file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > 0x7f2c167fc670) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0 > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0 > [pid 108807] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 168 > [pid 108807] 13:14:59 fsync(168) = 0 > [pid 108807] 13:14:59 close(168) = 0 > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > {st_mode=S_IFREG|0644, st_size=21416, ...}) = 0 > [pid 108807] 13:14:59 > rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata") > = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata", > {st_mode=S_IFREG|0644, st_size=36684, ...}) = 0 > [pid 108823] 13:14:59 > unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata") > = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 108823] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 108823] 13:14:59 close(168) = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 108823] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 108823] 13:14:59 close(168) = 0 > [pid 108823] 13:14:59 > unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1 > EISDIR (Is a directory) > [pid 108823] 13:14:59 > rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0 > {code} > Presumably only checkpoints with larger state sizes are affected as small > state is inlined into the "_metadata" file, which is properly persisted since > flink 1.19.1 due to FLINK-35217. -- This message was sent by Atlassian Jira (v8.20.10#820010)