[ 
https://issues.apache.org/jira/browse/FLINK-36421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17888115#comment-17888115
 ] 

Zakelly Lan edited comment on FLINK-36421 at 10/10/24 2:26 AM:
---------------------------------------------------------------

Merged dfb9bfeabac8d3ac289e46a3017ed68c50ba3777 into master


was (Author: zakelly):
Merged 
[fd427ff|https://github.com/apache/flink/pull/25468/commits/fd427ffdeaa9b9fc6c31dee3dc09ca982ad7b5ba]
 into master

> Missing fsync in FsCheckpointStreamFactory
> ------------------------------------------
>
>                 Key: FLINK-36421
>                 URL: https://issues.apache.org/jira/browse/FLINK-36421
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems, Runtime / Checkpointing
>    Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0
>            Reporter: Marc Aurel Fritz
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: fsync-fs-stream-factory.diff
>
>
> With Flink 1.20 we observed another checkpoint corruption bug. This is 
> similar to FLINK-35217, but affects only files written by the taskmanager 
> (the ones with random names as described 
> [here|https://github.com/Planet-X/flink/blob/0d6e25a9738d9d4ee94de94e1437f92611b50758/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java#L47]).
> After system crash the files written by the taskmanager may be corrupted 
> (file size of 0 bytes) if the changes in the file-system cache haven't been 
> written to disk. The "_metadata" file written by the jobmanager is always 
> fine because it's properly fsynced.
> Investigation revealed that "fsync" is missing, this time in 
> "FsCheckpointStreamFactory". In this case the "OutputStream" is closed 
> without calling "fsync", thus the file is not durably persisted on disk 
> before the checkpoint is completed. (As previously established in 
> FLINK-35217, calling "fsync" is necessary as simply closing the stream does 
> not have any guarantees on persistence.)
> "strace" on the taskmanager's process confirms this behavior:
>  # The checkpoint chk-1217's directory is created at "mkdir"
>  # The checkpoint chk-1217's non-inline state is written by the taskmanager 
> at "openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that 
> there's no "fsync" before "close".
>  # The checkpoint chk-1217 is finished, its "_metadata" is written and synced 
> properly
>  # The old checkpoint chk-1216 is deleted at "unlink"
> The new checkpoint chk-1217 now references a not-synced file that can get 
> corrupted on e.g. power loss. This means there is no working checkpoint left 
> as the old checkpoint was deleted.
> For durable persistence an "fsync" call is missing before "close" in step 2.
> Full "strace" log:
> {code:java}
> [pid 947250] 08:22:58 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> 0x7f68414c5b50) = -1 ENOENT (No such file or directory)
> [pid 947250] 08:22:58 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> 0x7f68414c5b50) = -1 ENOENT (No such file or directory)
> [pid 947250] 08:22:58 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:58 
> mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> 0777) = 0
> [pid 1303248] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
>  0x7f56f08d5610) = -1 ENOENT (No such file or directory)
> [pid 1303248] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 1303248] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
>  O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199
> [pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
> [pid 1303248] 08:22:59 close(199)       = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
>  0x7f683fb378b0) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
>  0x7f683fb37730) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  0x7f683fb37730) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947310] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 148
> [pid 947310] 08:22:59 fsync(148)        = 0
> [pid 947310] 08:22:59 close(148)        = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  {st_mode=S_IFREG|0644, st_size=46265, ...}) = 0
> [pid 947310] 08:22:59 
> rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata") 
> = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe",
>   <unfinished ...>
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata",
>   <unfinished ...>
> [pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409, 
> ...}) = 0
> [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265, 
> ...}) = 0
> [pid 947310] 08:22:59 
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe"
>  <unfinished ...>
> [pid 947250] 08:22:59 
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata"
>  <unfinished ...>
> [pid 947310] 08:22:59 <... unlink resumed>) = 0
> [pid 947250] 08:22:59 <... unlink resumed>) = 0
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148
> [pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755, 
> st_size=4096, ...}, AT_EMPTY_PATH) = 0
> [pid 947250] 08:22:59 close(148)        = 0
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",  
> <unfinished ...>
> [pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such 
> file or directory)
> [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096, 
> ...}) = 0
> [pid 947250] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...>
> [pid 947250] 08:22:59 <... openat resumed>) = 148
> [pid 947250] 08:22:59 newfstatat(148, "",  <unfinished ...>
> [pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755, 
> st_size=4096, ...}, AT_EMPTY_PATH) = 0
> [pid 947250] 08:22:59 close(148)        = 0
> [pid 947250] 08:22:59 
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 
> -1 EISDIR (Is a directory)
> [pid 947250] 08:22:59 
> rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0
> {code}
> Calling "sync()" when closing the stream in 
> "FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing 
> the serialized state files before returning their reference to the 
> jobmanager. The following commit fixes this: 
> [https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758]
> Diff is also attached.
> "strace" confirms that "fsync" is now called before the taskmanager's state 
> file is closed, see line 9:
> {code:java}
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> 0x7f2c167fc890) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> 0x7f2c167fc890) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc", 
> {st_mode=S_IFDIR|0755, st_size=44, ...}) = 0
> [pid 108807] 13:14:59 
> mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777) 
> = 0
> [pid 110456] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
>  0x7f7d56efbe90) = -1 ENOENT (No such file or directory)
> [pid 110456] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 110456] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
>  O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268
> [pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
> [pid 110456] 13:14:59 fsync(268)        = 0
> [pid 110456] 13:14:59 close(268)        = 0
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
>  0x7f2c167fc710) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
>   <unfinished ...>
> [pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such 
> file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  0x7f2c167fc670) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
> [pid 108807] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 108807] 13:14:59 fsync(168)        = 0
> [pid 108807] 13:14:59 close(168)        = 0
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  {st_mode=S_IFREG|0644, st_size=21416, ...}) = 0
> [pid 108807] 13:14:59 
> rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata") 
> = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata",
>  {st_mode=S_IFREG|0644, st_size=36684, ...}) = 0
> [pid 108823] 13:14:59 
> unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata")
>  = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 108823] 13:14:59 close(168)        = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 108823] 13:14:59 close(168)        = 0
> [pid 108823] 13:14:59 
> unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1 
> EISDIR (Is a directory)
> [pid 108823] 13:14:59 
> rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0
> {code}
> Presumably only checkpoints with larger state sizes are affected as small 
> state is inlined into the "_metadata" file, which is properly persisted since 
> flink 1.19.1 due to FLINK-35217.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to