[ https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825650#comment-16825650 ]
Ning Shi commented on FLINK-12296: ---------------------------------- [~srichter] The assumption totally made sense in that context. I agree that the reinterpretation is now a widely adopted feature as it is in the documentation, albeit under experiment feature, and is often recommended on the mailing list and in blog posts. It will be very nice to have this bug fixed, or at the very least prevent from using reinterpretation and RocksDB checkpointing together. The documentation should also warn against using them together. The challenge of the workaround for a user is that sometimes it's not obvious what operator have internal states, e.g. KeyedBroadcastProcessFunction may have internal timers stored in RocksDB. So it's not always easy to avoid getting two stateful operators chained unless the job disallows chaining altogether. > Data loss silently in RocksDBStateBackend when more than one operator(has > states) chained in a single task > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-12296 > URL: https://issues.apache.org/jira/browse/FLINK-12296 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 > Reporter: Congxian Qiu(klion26) > Assignee: Congxian Qiu(klion26) > Priority: Blocker > Fix For: 1.7.3, 1.9.0, 1.8.1 > > > As the mail list said[1], there may be a problem when more than one operator > chained in a single task, and all the operators have states, we'll encounter > data loss silently problem. > Currently, the local directory we used is like below > ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), > > if more than one operator chained in a single task, and all the operators > have states, then all the operators will share the same local > directory(because the vertext_id is the same), this will lead a data loss > problem. > > The path generation logic is below: > {code:java} > // LocalRecoveryDirectoryProviderImpl.java > @Override > public File subtaskSpecificCheckpointDirectory(long checkpointId) { > return new File(subtaskBaseDirectory(checkpointId), > checkpointDirString(checkpointId)); > } > @VisibleForTesting > String subtaskDirString() { > return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + > subtaskIndex).toString(); > } > @VisibleForTesting > String checkpointDirString(long checkpointId) { > return "chk_" + checkpointId; > } > {code} > [1] > [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)