[ https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826787#comment-16826787 ]
Stefan Richter commented on FLINK-12296: ---------------------------------------- [~klion26] I was just taking a look into the code and in my opinion the paths should be already good as they are, each operator goes into a separate directory already inside the {{chk_x}} directory. However, the problem is with {{RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory}}, which a bit to ambitiously does things like {{if (directory.exists()) FileUtils.deleteDirectory(directory)}}. What about just not deleting the existing directory here, just checking this for the sub-directory of the operator itself, but not the chk-root? If I am not missing anything here, that would should be a more appropriate fix. What do you think? > Data loss silently in RocksDBStateBackend when more than one operator(has > states) chained in a single task > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-12296 > URL: https://issues.apache.org/jira/browse/FLINK-12296 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 > Reporter: Congxian Qiu(klion26) > Assignee: Congxian Qiu(klion26) > Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Time Spent: 10m > Remaining Estimate: 0h > > As the mail list said[1], there may be a problem when more than one operator > chained in a single task, and all the operators have states, we'll encounter > data loss silently problem. > Currently, the local directory we used is like below > ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), > > if more than one operator chained in a single task, and all the operators > have states, then all the operators will share the same local > directory(because the vertext_id is the same), this will lead a data loss > problem. > > The path generation logic is below: > {code:java} > // LocalRecoveryDirectoryProviderImpl.java > @Override > public File subtaskSpecificCheckpointDirectory(long checkpointId) { > return new File(subtaskBaseDirectory(checkpointId), > checkpointDirString(checkpointId)); > } > @VisibleForTesting > String subtaskDirString() { > return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + > subtaskIndex).toString(); > } > @VisibleForTesting > String checkpointDirString(long checkpointId) { > return "chk_" + checkpointId; > } > {code} > [1] > [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)