[ https://issues.apache.org/jira/browse/FLINK-4502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438754#comment-15438754 ]
Chesnay Schepler commented on FLINK-4502: ----------------------------------------- Alright, so let's break this down point by point: You're description of how the WAL works is correct. However, it only attempts to prevent *a different version* of a checkpoint from being committed. You are correct that the sink should stop sending data when sendValues returns false which can lead to some data for that checkpoint from never being written. However, you are incorrect in regards to this potentially overwriting newer data. We manually set the time stamp of the queries based on the checkpoint id; newer checkpoint => newer timestamp, which means that even if the query is submitted to cassandra it will not overwrite anything. The documentation states that we provide exactly-once semantics for idempotent updates, as such by definition the writes to cassandra at any point cannot be non-idempotent. The sink does in fact not guarantee exactly-once *delivery*, but it doesn't claim that it does. It fulfills exactly-once *semantics* in so far that, which pretty much means that if you look at any point in time at the data in cassandra it will show a state that would be reached if the messages would be delivered exactly-once. > Cassandra connector documentation has misleading consistency guarantees > ----------------------------------------------------------------------- > > Key: FLINK-4502 > URL: https://issues.apache.org/jira/browse/FLINK-4502 > Project: Flink > Issue Type: Bug > Components: Documentation > Affects Versions: 1.1.0 > Reporter: Elias Levy > > The Cassandra connector documentation states that "enableWriteAheadLog() is > an optional method, that allows exactly-once processing for non-deterministic > algorithms." This claim appears to be false. > From what I gather, the write ahead log feature of the connector works as > follows: > - The sink is replaced with a stateful operator that writes incoming messages > to the state backend based on checkpoint they belong in. > - When the operator is notified that a Flink checkpoint has been completed > it, for each set of checkpoints older than and including the committed one: > * reads its messages from the state backend > * writes them to Cassandra > * records that it has committed them to Cassandra for the specific > checkpoint and operator instance > * and erases them from the state backend. > This process attempts to avoid resubmitting queries to Cassandra that would > otherwise occur when recovering a job from a checkpoint and having messages > replayed. > Alas, this does not guarantee exactly once semantics at the sink. The writes > to Cassandra that occur when the operator is notified that checkpoint is > completed are not atomic and they are potentially non-idempotent. If the job > dies while writing to Cassandra or before committing the checkpoint via > committer, queries will be replayed when the job recovers. Thus the > documentation appear to be incorrect in stating this provides exactly-once > semantics. > There also seems to be an issue in GenericWriteAheadSink's > notifyOfCompletedCheckpoint which may result in incorrect output. If > sendValues returns false because a write failed, instead of bailing, it > simply moves on to the next checkpoint to commit if there is one, keeping the > previous one around to try again later. But that can result in newer data > being overwritten with older data when the previous checkpoint is retried. > Although given that CassandraCommitter implements isCheckpointCommitted as > checkpointID <= this.lastCommittedCheckpointID, it actually means that when > it goes back to try the uncommitted older checkpoint it will consider it > committed, even though some of its data may not have been written out, and > the data will be discarded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)