[ 
https://issues.apache.org/jira/browse/FLINK-36421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17887822#comment-17887822
 ] 

Marc Aurel Fritz commented on FLINK-36421:
------------------------------------------

[~gaborgsomogyi] We're been running the patched version for about a month on 
the most affected edge device. It's turned on and off multiple times a day and 
a corrupted checkpoint happened quite reliably at least once a week pre-patch. 
About a week ago we also rolled out the patched version to all other edge 
devices in production and they seem to run perfectly stable. No corrupted 
checkpoints have been observed with the patch in place for now - I'll keep you 
updated if that changes!

[~zakelly] Interesting! I assumed rocksdb wouldn't be affected as I expected 
that the rocksdb library is used to  write all the serialized state to disk as 
part of the database. Does the rocksdb state backend also reference larger 
state as external files written by the {{{}FsCheckpointStreamFactory{}}}, then 
presumably just storing their references in the rocksdb database? I'll take a 
closer look on how this works!

I've opened up a PR for this here: https://github.com/apache/flink/pull/25468

> Missing fsync in FsCheckpointStreamFactory
> ------------------------------------------
>
>                 Key: FLINK-36421
>                 URL: https://issues.apache.org/jira/browse/FLINK-36421
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems, Runtime / Checkpointing
>    Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0
>            Reporter: Marc Aurel Fritz
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: fsync-fs-stream-factory.diff
>
>
> With Flink 1.20 we observed another checkpoint corruption bug. This is 
> similar to FLINK-35217, but affects only files written by the taskmanager 
> (the ones with random names as described 
> [here|https://github.com/Planet-X/flink/blob/0d6e25a9738d9d4ee94de94e1437f92611b50758/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java#L47]).
> After system crash the files written by the taskmanager may be corrupted 
> (file size of 0 bytes) if the changes in the file-system cache haven't been 
> written to disk. The "_metadata" file written by the jobmanager is always 
> fine because it's properly fsynced.
> Investigation revealed that "fsync" is missing, this time in 
> "FsCheckpointStreamFactory". In this case the "OutputStream" is closed 
> without calling "fsync", thus the file is not durably persisted on disk 
> before the checkpoint is completed. (As previously established in 
> FLINK-35217, calling "fsync" is necessary as simply closing the stream does 
> not have any guarantees on persistence.)
> "strace" on the taskmanager's process confirms this behavior:
>  # The checkpoint chk-1217's directory is created at "mkdir"
>  # The checkpoint chk-1217's non-inline state is written by the taskmanager 
> at "openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that 
> there's no "fsync" before "close".
>  # The checkpoint chk-1217 is finished, its "_metadata" is written and synced 
> properly
>  # The old checkpoint chk-1216 is deleted at "unlink"
> The new checkpoint chk-1217 now references a not-synced file that can get 
> corrupted on e.g. power loss. This means there is no working checkpoint left 
> as the old checkpoint was deleted.
> For durable persistence an "fsync" call is missing before "close" in step 2.
> Full "strace" log:
> {code:java}
> [pid 947250] 08:22:58 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> 0x7f68414c5b50) = -1 ENOENT (No such file or directory)
> [pid 947250] 08:22:58 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> 0x7f68414c5b50) = -1 ENOENT (No such file or directory)
> [pid 947250] 08:22:58 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:58 
> mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> 0777) = 0
> [pid 1303248] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
>  0x7f56f08d5610) = -1 ENOENT (No such file or directory)
> [pid 1303248] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 1303248] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
>  O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199
> [pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
> [pid 1303248] 08:22:59 close(199)       = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
>  0x7f683fb378b0) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
>  0x7f683fb37730) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  0x7f683fb37730) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947310] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 148
> [pid 947310] 08:22:59 fsync(148)        = 0
> [pid 947310] 08:22:59 close(148)        = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  {st_mode=S_IFREG|0644, st_size=46265, ...}) = 0
> [pid 947310] 08:22:59 
> rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
>  "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata") 
> = 0
> [pid 947310] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe",
>   <unfinished ...>
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata",
>   <unfinished ...>
> [pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409, 
> ...}) = 0
> [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265, 
> ...}) = 0
> [pid 947310] 08:22:59 
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe"
>  <unfinished ...>
> [pid 947250] 08:22:59 
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata"
>  <unfinished ...>
> [pid 947310] 08:22:59 <... unlink resumed>) = 0
> [pid 947250] 08:22:59 <... unlink resumed>) = 0
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148
> [pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755, 
> st_size=4096, ...}, AT_EMPTY_PATH) = 0
> [pid 947250] 08:22:59 close(148)        = 0
> [pid 947250] 08:22:59 
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",  
> <unfinished ...>
> [pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such 
> file or directory)
> [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096, 
> ...}) = 0
> [pid 947250] 08:22:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...>
> [pid 947250] 08:22:59 <... openat resumed>) = 148
> [pid 947250] 08:22:59 newfstatat(148, "",  <unfinished ...>
> [pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755, 
> st_size=4096, ...}, AT_EMPTY_PATH) = 0
> [pid 947250] 08:22:59 close(148)        = 0
> [pid 947250] 08:22:59 
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 
> -1 EISDIR (Is a directory)
> [pid 947250] 08:22:59 
> rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0
> {code}
> Calling "sync()" when closing the stream in 
> "FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing 
> the serialized state files before returning their reference to the 
> jobmanager. The following commit fixes this: 
> [https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758]
> Diff is also attached.
> "strace" confirms that "fsync" is now called before the taskmanager's state 
> file is closed, see line 9:
> {code:java}
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> 0x7f2c167fc890) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> 0x7f2c167fc890) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc", 
> {st_mode=S_IFDIR|0755, st_size=44, ...}) = 0
> [pid 108807] 13:14:59 
> mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777) 
> = 0
> [pid 110456] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
>  0x7f7d56efbe90) = -1 ENOENT (No such file or directory)
> [pid 110456] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 110456] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
>  O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268
> [pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
> [pid 110456] 13:14:59 fsync(268)        = 0
> [pid 110456] 13:14:59 close(268)        = 0
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
>  0x7f2c167fc710) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
>   <unfinished ...>
> [pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such 
> file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  0x7f2c167fc670) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
> {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
> [pid 108807] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 108807] 13:14:59 fsync(168)        = 0
> [pid 108807] 13:14:59 close(168)        = 0
> [pid 108807] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  {st_mode=S_IFREG|0644, st_size=21416, ...}) = 0
> [pid 108807] 13:14:59 
> rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
>  "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata") 
> = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata",
>  {st_mode=S_IFREG|0644, st_size=36684, ...}) = 0
> [pid 108823] 13:14:59 
> unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata")
>  = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 108823] 13:14:59 close(168)        = 0
> [pid 108823] 13:14:59 
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 openat(AT_FDCWD, 
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
> ...}, AT_EMPTY_PATH) = 0
> [pid 108823] 13:14:59 close(168)        = 0
> [pid 108823] 13:14:59 
> unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1 
> EISDIR (Is a directory)
> [pid 108823] 13:14:59 
> rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0
> {code}
> Presumably only checkpoints with larger state sizes are affected as small 
> state is inlined into the "_metadata" file, which is properly persisted since 
> flink 1.19.1 due to FLINK-35217.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to