[ https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16850320#comment-16850320 ]
Congxian Qiu(klion26) edited comment on FLINK-12296 at 5/29/19 2:42 AM: ------------------------------------------------------------------------ Since the commits have been merged in master, release-1.8, release-1.7 and release-1.6(find the commits' info in the previous comments), resolve this issue. was (Author: klion26): Since the commits have been merged in master, release-1.8, release-1.7 and release-1.6(find the commits' info in the last comments), resolve this issue. > Data loss silently in RocksDBStateBackend when more than one operator(has > states) chained in a single task > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-12296 > URL: https://issues.apache.org/jira/browse/FLINK-12296 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 > Reporter: Congxian Qiu(klion26) > Assignee: Congxian Qiu(klion26) > Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.9.0, 1.8.1 > > Time Spent: 20m > Remaining Estimate: 0h > > As the mail list said[1], there may be a problem when more than one operator > chained in a single task, and all the operators have states, we'll encounter > data loss silently problem. > Currently, the local directory we used is like below > ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), > > if more than one operator chained in a single task, and all the operators > have states, then all the operators will share the same local > directory(because the vertext_id is the same), this will lead a data loss > problem. > > The path generation logic is below: > {code:java} > // LocalRecoveryDirectoryProviderImpl.java > @Override > public File subtaskSpecificCheckpointDirectory(long checkpointId) { > return new File(subtaskBaseDirectory(checkpointId), > checkpointDirString(checkpointId)); > } > @VisibleForTesting > String subtaskDirString() { > return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + > subtaskIndex).toString(); > } > @VisibleForTesting > String checkpointDirString(long checkpointId) { > return "chk_" + checkpointId; > } > {code} > [1] > [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)