[ https://issues.apache.org/jira/browse/FLINK-36421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17888115#comment-17888115 ]
Zakelly Lan edited comment on FLINK-36421 at 10/10/24 2:26 AM: --------------------------------------------------------------- Merged dfb9bfeabac8d3ac289e46a3017ed68c50ba3777 into master was (Author: zakelly): Merged [fd427ff|https://github.com/apache/flink/pull/25468/commits/fd427ffdeaa9b9fc6c31dee3dc09ca982ad7b5ba] into master > Missing fsync in FsCheckpointStreamFactory > ------------------------------------------ > > Key: FLINK-36421 > URL: https://issues.apache.org/jira/browse/FLINK-36421 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Checkpointing > Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0 > Reporter: Marc Aurel Fritz > Priority: Critical > Labels: pull-request-available > Attachments: fsync-fs-stream-factory.diff > > > With Flink 1.20 we observed another checkpoint corruption bug. This is > similar to FLINK-35217, but affects only files written by the taskmanager > (the ones with random names as described > [here|https://github.com/Planet-X/flink/blob/0d6e25a9738d9d4ee94de94e1437f92611b50758/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java#L47]). > After system crash the files written by the taskmanager may be corrupted > (file size of 0 bytes) if the changes in the file-system cache haven't been > written to disk. The "_metadata" file written by the jobmanager is always > fine because it's properly fsynced. > Investigation revealed that "fsync" is missing, this time in > "FsCheckpointStreamFactory". In this case the "OutputStream" is closed > without calling "fsync", thus the file is not durably persisted on disk > before the checkpoint is completed. (As previously established in > FLINK-35217, calling "fsync" is necessary as simply closing the stream does > not have any guarantees on persistence.) > "strace" on the taskmanager's process confirms this behavior: > # The checkpoint chk-1217's directory is created at "mkdir" > # The checkpoint chk-1217's non-inline state is written by the taskmanager > at "openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that > there's no "fsync" before "close". > # The checkpoint chk-1217 is finished, its "_metadata" is written and synced > properly > # The old checkpoint chk-1216 is deleted at "unlink" > The new checkpoint chk-1217 now references a not-synced file that can get > corrupted on e.g. power loss. This means there is no working checkpoint left > as the old checkpoint was deleted. > For durable persistence an "fsync" call is missing before "close" in step 2. > Full "strace" log: > {code:java} > [pid 947250] 08:22:58 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > 0x7f68414c5b50) = -1 ENOENT (No such file or directory) > [pid 947250] 08:22:58 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > 0x7f68414c5b50) = -1 ENOENT (No such file or directory) > [pid 947250] 08:22:58 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947250] 08:22:58 > mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > 0777) = 0 > [pid 1303248] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc", > 0x7f56f08d5610) = -1 ENOENT (No such file or directory) > [pid 1303248] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 1303248] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc", > O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199 > [pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 > [pid 1303248] 08:22:59 close(199) = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata", > 0x7f683fb378b0) = -1 ENOENT (No such file or directory) > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata", > 0x7f683fb37730) = -1 ENOENT (No such file or directory) > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > 0x7f683fb37730) = -1 ENOENT (No such file or directory) > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947310] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 148 > [pid 947310] 08:22:59 fsync(148) = 0 > [pid 947310] 08:22:59 close(148) = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > {st_mode=S_IFREG|0644, st_size=46265, ...}) = 0 > [pid 947310] 08:22:59 > rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata") > = 0 > [pid 947310] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe", > <unfinished ...> > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata", > <unfinished ...> > [pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409, > ...}) = 0 > [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265, > ...}) = 0 > [pid 947310] 08:22:59 > unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe" > <unfinished ...> > [pid 947250] 08:22:59 > unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata" > <unfinished ...> > [pid 947310] 08:22:59 <... unlink resumed>) = 0 > [pid 947250] 08:22:59 <... unlink resumed>) = 0 > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 > [pid 947250] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148 > [pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755, > st_size=4096, ...}, AT_EMPTY_PATH) = 0 > [pid 947250] 08:22:59 close(148) = 0 > [pid 947250] 08:22:59 > stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > <unfinished ...> > [pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such > file or directory) > [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096, > ...}) = 0 > [pid 947250] 08:22:59 openat(AT_FDCWD, > "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...> > [pid 947250] 08:22:59 <... openat resumed>) = 148 > [pid 947250] 08:22:59 newfstatat(148, "", <unfinished ...> > [pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755, > st_size=4096, ...}, AT_EMPTY_PATH) = 0 > [pid 947250] 08:22:59 close(148) = 0 > [pid 947250] 08:22:59 > unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = > -1 EISDIR (Is a directory) > [pid 947250] 08:22:59 > rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0 > {code} > Calling "sync()" when closing the stream in > "FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing > the serialized state files before returning their reference to the > jobmanager. The following commit fixes this: > [https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758] > Diff is also attached. > "strace" confirms that "fsync" is now called before the taskmanager's state > file is closed, see line 9: > {code:java} > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > 0x7f2c167fc890) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > 0x7f2c167fc890) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc", > {st_mode=S_IFDIR|0755, st_size=44, ...}) = 0 > [pid 108807] 13:14:59 > mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777) > = 0 > [pid 110456] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222", > 0x7f7d56efbe90) = -1 ENOENT (No such file or directory) > [pid 110456] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 110456] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222", > O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268 > [pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 > [pid 110456] 13:14:59 fsync(268) = 0 > [pid 110456] 13:14:59 close(268) = 0 > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata", > 0x7f2c167fc710) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata", > <unfinished ...> > [pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such > file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > 0x7f2c167fc670) = -1 ENOENT (No such file or directory) > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0 > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", > {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0 > [pid 108807] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > O_WRONLY|O_CREAT|O_EXCL, 0666) = 168 > [pid 108807] 13:14:59 fsync(168) = 0 > [pid 108807] 13:14:59 close(168) = 0 > [pid 108807] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > {st_mode=S_IFREG|0644, st_size=21416, ...}) = 0 > [pid 108807] 13:14:59 > rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata") > = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata", > {st_mode=S_IFREG|0644, st_size=36684, ...}) = 0 > [pid 108823] 13:14:59 > unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata") > = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 108823] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 108823] 13:14:59 close(168) = 0 > [pid 108823] 13:14:59 > stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 > [pid 108823] 13:14:59 openat(AT_FDCWD, > "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", > O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 > [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, > ...}, AT_EMPTY_PATH) = 0 > [pid 108823] 13:14:59 close(168) = 0 > [pid 108823] 13:14:59 > unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1 > EISDIR (Is a directory) > [pid 108823] 13:14:59 > rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0 > {code} > Presumably only checkpoints with larger state sizes are affected as small > state is inlined into the "_metadata" file, which is properly persisted since > flink 1.19.1 due to FLINK-35217. -- This message was sent by Atlassian Jira (v8.20.10#820010)