[ https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16824945#comment-16824945 ]
Stefan Richter commented on FLINK-12296: ---------------------------------------- I guess this is one remaining relic from how backends were created, because previously only the head operator of a chain could have keyed state and a backend. This was partially changed now, every op can have a keyed state backend. The old assumption was probably made because of the expectation that keyed state only appears right after keyBy, which also always starts a new chain. So how is your job even getting keyed state on a non-head operator? Using {{reinterpretAsKeyedStream}}? > Data loss silently in RocksDBStateBackend when more than one operator(has > states) chained in a single task > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-12296 > URL: https://issues.apache.org/jira/browse/FLINK-12296 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 > Reporter: Congxian Qiu(klion26) > Assignee: Congxian Qiu(klion26) > Priority: Blocker > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > As the mail list said[1], there may be a problem when more than one operator > chained in a single task, and all the operators have states, we'll encounter > data loss silently problem. > Currently, the local directory we used is like below > ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), > > if more than one operator chained in a single task, and all the operators > have states, then all the operators will share the same local > directory(because the vertext_id is the same), this will lead a data loss > problem. > > The path generation logic is below: > {code:java} > // LocalRecoveryDirectoryProviderImpl.java > @Override > public File subtaskSpecificCheckpointDirectory(long checkpointId) { > return new File(subtaskBaseDirectory(checkpointId), > checkpointDirString(checkpointId)); > } > @VisibleForTesting > String subtaskDirString() { > return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + > subtaskIndex).toString(); > } > @VisibleForTesting > String checkpointDirString(long checkpointId) { > return "chk_" + checkpointId; > } > {code} > [1] > [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)